From 331a021d9aa392160aee4329c8977085581e8293 Mon Sep 17 00:00:00 2001 From: Rafael Lopes Date: Wed, 22 Apr 2026 16:55:44 -0300 Subject: [PATCH] FEAT: Implementado ETL completo para Ruijie e Wifeed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adicionado suporte para extração de dados do Ruijie e WiFeed, incluindo autenticação e tratamento de erros. - Adicionado suporte para watermarking em ambas as fontes para extração incremental. - Criado script de transformação para mesclagem de MAC addresses. - Implementado Backfill para WiFeed, permitindo extração histórica com controle de taxa. - Adicionado script de depuração para testes de transformação do WiFeed. - Desenvolvido scripts de implantação e configurações do Docker para setup de produção. - Criado script de inicialização do schema do PostgreSQL em infra/init.sql. - Adicionado teste automatizado para lógica de transformação e carregamento em test_transform_load.py. - Atualizado documentation para implantação e setup de produção. --- .env.example | 116 ++++++++++++++++-- .gitea/workflows/deploy.yml | 83 +++++++++++++ README.md | 233 +++++++++++++++++++++++++++++------- app/core/config.py | 21 ++-- app/core/db.py | 95 +++++++++++++++ app/extractor/ruijie.py | 109 +++++++++++++++++ app/extractor/wifeed.py | 135 +++++++++++++++++++++ app/load/load_database.py | 83 +++++++++++++ app/main.py | 144 ++++++++++++++++++++++ app/transform/merge_mac.py | 63 ++++++++++ backfill.py | 46 +++++++ debug_wifeed.py | 53 ++++++++ deploy.sh | 88 ++++++++++++++ docker-compose.prod.yml | 36 ++++++ docs/DEPLOY.md | 213 ++++++++++++++++++++++++++++++++ docs/PROD_SETUP.md | 196 ++++++++++++++++++++++++++++++ infra/Dockerfile | 34 ++++++ infra/crontab | 6 + infra/docker-compose.yml | 41 +++++++ infra/entrypoint.sh | 7 ++ infra/init.sql | 116 ++++++++++++++++++ test_transform_load.py | 173 ++++++++++++++++++++++++++ 22 files changed, 2028 insertions(+), 63 deletions(-) create mode 100644 backfill.py create mode 100644 debug_wifeed.py create mode 100644 deploy.sh create mode 100644 docker-compose.prod.yml create mode 100644 docs/DEPLOY.md create mode 100644 docs/PROD_SETUP.md create mode 100644 infra/entrypoint.sh create mode 100644 infra/init.sql create mode 100644 test_transform_load.py diff --git a/.env.example b/.env.example index 8bf3f1b..f9e6017 100644 --- a/.env.example +++ b/.env.example @@ -1,17 +1,113 @@ -# PostgreSQL +# Variáveis de ambiente — WiFi ETL + +Copie para `.env` e preencha conforme seu ambiente. + +## 🔧 Aplicação (obrigatórias) + +```env +# PostgreSQL (pode ser VM separada ou Docker local) DB_HOST=localhost DB_PORT=5432 DB_NAME=wifi_etl DB_USER=postgres -DB_PASSWORD= +DB_PASSWORD=sua_senha_aqui -# API Ruijie (AP) -RUIJIE_BASE_URL= -RUIJIE_API_KEY= +# Ruijie (AP) +RUIJIE_BASE_URL=https://cloud-eu.ruijienetworks.com +RUIJIE_APPID=open52d4899cdbe2 +RUIJIE_SECRET=10493c81e8e94f56b8710d78ed2527c7 +RUIJIE_ACCESS_TOKEN= # opcional — ETL renova automaticamente +RUIJIE_GROUP_ID=9290679 -# API WiFeed (autenticação) -WIFEED_BASE_URL= -WIFEED_API_KEY= +# WiFeed (autenticação usuários) +WIFEED_BASE_URL=https://api.wifeed.com.br +WIFEED_CLIENT_ID=60e40ee2-f39f-4556-8a22-840a2e3fa686 +WIFEED_CLIENT_SECRET=dRpd6FB2hjbyvcA -# ETL config -LOG_LEVEL=INFO \ No newline at end of file +# Logging +LOG_LEVEL=INFO +``` + +--- + +## 🔐 Secrets (Gitea CI/CD) + +No **Gitea**, cadastre em **Settings → Secrets**: + +| Nome | Descrição | +|---|---| +| `SSH_PRIVATE_KEY` | Chave privada SSH para deploy no servidor (arquivo `id_rsa`) | +| `REGISTRY_PASSWORD` | Token do Docker Registry (se usar) | +| `DB_PASSWORD` | Senha do PostgreSQL (pode ser secret) | +| `RUIJIE_ACCESS_TOKEN` | Token Ruijie (opcional — ETL pode renovar) | +| `RUIJIE_SECRET` | Secret Ruijie | +| `WIFEED_CLIENT_SECRET` | Client Secret WiFeed | + +--- + +## 📦 Variáveis (Gitea CI/CD) + +Em **Settings → Variables** (não sensíveis): + +| Nome | Valor | Padrão | +|---|---|---| +| `DB_HOST` | IP/host do banco | — | +| `DB_PORT` | Porta PostgreSQL | `5432` | +| `DB_NAME` | Nome do banco | `wifi_etl` | +| `DB_USER` | Usuário DB | `postgres` | +| `RUIJIE_BASE_URL` | URL base Ruijie | — | +| `RUIJIE_APPID` | AppID Ruijie | — | +| `RUIJIE_SECRET` | Secret Ruijie | — | +| `RUIJIE_GROUP_ID` | Group ID Ruijie | `9290679` | +| `WIFEED_BASE_URL` | URL base WiFeed | — | +| `WIFEED_CLIENT_ID` | Client ID WiFeed | — | +| `LOG_LEVEL` | Nível de log | `INFO` | +| `REGISTRY` | Docker Registry (ex: `ghcr.io/user`) | vazio | +| `REGISTRY_USERNAME` | Username do registry | vazio | +| `SSH_HOST` | IP/host do servidor prod | — | +| `SSH_USER` | Usuário SSH do servidor | — | + +--- + +## 🐳 Docker Compose Override (opcional) + +Crie `docker-compose.override.yml` para dev com PG embutido: + +```yaml +version: '3.8' +services: + postgres: + image: postgres:15-alpine + environment: + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_DB: ${DB_NAME} + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./infra/init.sql:/docker-entrypoint-initdb.d/init.sql:ro + + etl: + build: + context: . + dockerfile: infra/Dockerfile + depends_on: + postgres: + condition: service_healthy + environment: + DB_HOST: postgres + DB_PORT: 5432 + DB_NAME: ${DB_NAME} + DB_USER: ${DB_USER} + DB_PASSWORD: ${DB_PASSWORD} + RUIJIE_*: ${RUIJIE_*} + WIFEED_*: ${WIFEED_*} + volumes: + - etl_logs:/var/log + +volumes: + postgres_data: + etl_logs: +``` + +Use: `docker-compose up -d` (lê `.env` automaticamente). diff --git a/.gitea/workflows/deploy.yml b/.gitea/workflows/deploy.yml index e69de29..86bf87f 100644 --- a/.gitea/workflows/deploy.yml +++ b/.gitea/workflows/deploy.yml @@ -0,0 +1,83 @@ +name: Deploy WiFi-ETL Prod + +on: + push: + branches: + - main + +jobs: + deploy: + runs-on: self-hosted + + steps: + - name: Clone/Update código + run: | + mkdir -p /opt/wifi-etl + cd /opt/wifi-etl + + if [ -d .git ]; then + echo "Atualizando código..." + git pull origin main + else + echo "Clonando projeto..." + git clone https://seu-gitea.com/seu-usuario/wifi-etl.git /opt/wifi-etl + cd /opt/wifi-etl + fi + + echo "✓ Código atualizado" + + - name: Copiar .env + run: | + cp /home/desenvolvimento/.envs/wifi_etl/.env /opt/wifi-etl/.env + echo "✓ .env copiado" + + - name: Build e deploy container + run: | + cd /opt/wifi-etl + docker compose -f infra/docker-compose.yml up -d --build + echo "✓ Container iniciado" + + - name: Aguardar inicialização + run: sleep 5 + + - name: Validar saúde - CRON + run: | + echo "--- Verificando CRON ---" + docker exec wifi_etl_worker ps aux | grep cron || echo "⚠ Cron pode não estar rodando" + + - name: Validar saúde - Database + run: | + echo "--- Verificando conexão com banco ---" + docker exec wifi_etl_worker python3 << 'PYEOF' + import psycopg2 + import os + import sys + + try: + conn = psycopg2.connect( + host=os.getenv('DB_HOST'), + port=int(os.getenv('DB_PORT', '5432')), + dbname=os.getenv('DB_NAME'), + user=os.getenv('DB_USER'), + password=os.getenv('DB_PASSWORD'), + connect_timeout=5 + ) + cur = conn.cursor() + cur.execute("SELECT version();") + version = cur.fetchone()[0] + print(f"✓ Banco de dados conectado") + print(f" {version[:60]}") + cur.close() + conn.close() + except Exception as e: + print(f"✗ Erro de conexão: {e}") + sys.exit(1) + PYEOF + + - name: Verificar logs iniciais + if: always() + run: | + echo "--- Últimos logs do container ---" + docker logs --tail 20 wifi_etl_worker || true + + diff --git a/README.md b/README.md index e436aa2..c6aca38 100644 --- a/README.md +++ b/README.md @@ -108,90 +108,231 @@ Crie um `.env` baseado no exemplo: cp .env.example .env ``` -Preencha com suas credenciais: +Preencha com suas credenciais (WiFeed já inclusa no `.env`): ```env -DB_HOST= +# PostgreSQL +DB_HOST=localhost DB_PORT=5432 -DB_NAME= -DB_USER= -DB_PASSWORD= +DB_NAME=wifi_etl +DB_USER=postgres +DB_PASSWORD=sua_senha -RUIJIE_BASE_URL= -RUIJIE_API_KEY= +# Ruijie (AP) +RUIJIE_BASE_URL=https://cloud-eu.ruijienetworks.com +RUIJIE_APPID=seu_appid +RUIJIE_SECRET=seu_secret +RUIJIE_ACCESS_TOKEN= +RUIJIE_GROUP_ID=9290679 -WIFEED_BASE_URL= -WIFEED_API_KEY= +# WiFeed (autenticação usuários) +WIFEED_BASE_URL=https://api.wifeed.com.br +WIFEED_CLIENT_ID=60e40ee2-f39f-4556-8a22-840a2e3fa686 +WIFEED_CLIENT_SECRET=dRpd6FB2hjbyvcA + +LOG_LEVEL=INFO ``` --- -### 3. Subir o ambiente +### 3. **Modo Desenvolvedor (local)** ```bash -docker-compose up --build -d +# a) Crie venv +python -m venv .venv +.venv\Scripts\activate # Windows +# source .venv/bin/activate # Linux/Mac + +# b) Instale deps +pip install -r requirements.txt + +# c) Suba PostgreSQL (Docker) +docker run -d --name wifi-db \ + -e POSTGRES_PASSWORD=$DB_PASSWORD \ + -e POSTGRES_DB=$DB_NAME \ + -p 5432:5432 postgres:15-alpine + +# d) Crie o schema +docker exec -i wifi-db psql -U postgres -d wifi_etl < infra/init.sql + +# e) Rode o ETL (uma vez) +python app/main.py + +# f) (Opcional) Agende localmente +# crontab -e +# */5 * * * * cd /caminho/wifi-etl && /usr/bin/python3 main.py >> /var/log/wifi-etl.log 2>&1 ``` --- -### 4. Execução +### 4. **Modo Produção (Docker Compose)** -O pipeline será executado automaticamente a cada **5 minutos** via cron. +```bash +# Build e sobe Postgres + ETL (ambiente dev completo) +docker-compose up --build -d + +# Logs +docker-compose logs -f etl + +# Executa manualmente (debug) +docker-compose exec etl python /app/main.py +``` --- -## 🗄️ Modelo de Dados (planejado) +### 5. **Deploy Automático (CI/CD)** -### Tabela: users +Ao fazer `git push` na branch `main`, o workflow **`.gitea/workflows/deploy.yml`** dispara automaticamente: -* nome -* cpf -* sexo -* mac +1. **Builda** imagem Docker (`wifi-etl:`) +2. **Salva** como artifact (backup) +3. **Deploy via SSH** no servidor de produção configurado -### Tabela: sessions +#### Configurar Secrets no Gitea -* mac -* access_point -* tempo_conectado -* timestamp +Acesse **Settings → Secrets** do repositório e adicione: + +| Nome | Descrição | +|---|---| +| `SSH_PRIVATE_KEY` | Chave privada SSH para acesso ao servidor | +| `SSH_HOST` | IP/hostname do servidor prod (ex: `etl.prod.example.com`) | +| `SSH_USER` | Usuário SSH (ex: `deploy`) | +| `REGISTRY` | (opcional) Registry Docker (ex: `ghcr.io/seu-user`) | +| `REGISTRY_USERNAME` | (opcional) Username do registry | +| `REGISTRY_PASSWORD` | (opcional) Token/Password do registry | + +#### Variáveis de Ambiente no Servidor + +No servidor de produção, crie `/opt/wifi-etl/.env` com mesmas variáveis do `.env` local (ou use docker-compose override). + +```bash +# No servidor (primeira vez) +mkdir -p /opt/wifi-etl +# Copie arquivos: docker-compose.prod.yml + .env (via scp) + +# Deploy manual (força rebuild) +./deploy.sh prod +``` --- -## 🔄 Processo ETL +### 6. Consultas Úteis -### Extract +```sql +-- Sessões com dados do usuário +SELECT u.name, u.cpf, s.access_point_name, + s.online_time, s.offline_time, + EXTRACT(EPOCH FROM (s.offline_time - s.online_time))/60 AS mins +FROM sessions s +JOIN users u ON u.mac_address = s.mac_address +ORDER BY s.online_time DESC +LIMIT 20; -Coleta dados das APIs Ruijie e WiFeed +-- Sessões por prédio (último dia) +SELECT building_name, COUNT(*) AS total_sessions, + SUM(active_time_ms)/60000 AS total_minutes +FROM sessions +WHERE online_time >= NOW() - INTERVAL '1 day' +GROUP BY building_name; -### Transform - -* Normalização de dados -* Merge pelo MAC Address - -### Load - -Inserção e/ou atualização no PostgreSQL +-- Usuários únicos (ontem) +SELECT COUNT(DISTINCT mac_address) AS usuarios_unicos +FROM sessions +WHERE online_time::date = CURRENT_DATE - 1; +``` --- -## 📦 Tecnologias +## 🏗️ Arquitetura (versão simplificada) -* Python -* Docker -* PostgreSQL -* Requests +```mermaid +flowchart TD + + A[Cron (5min)] --> B[main.py] + + B --> C{Watermark
last_run < 1h?} + C -->|Sim| D[Usa watermark
(delta only)] + C -->|Não| E[Extrai tudo] + + D --> F[Ruijie API
page=1..N] + E --> F + + F --> G[transform_ruijie
normalize_mac] + F --> H[transform_wifeed
normalize_mac + CPF] + + G --> I[PostgreSQL
sessions + users] + H --> I + + I --> J[Update watermarks
last_run_at + last_value] +``` + +--- + +## 📊 Modelo de Dados + +``` +users + id (PK) + mac_address (UNIQUE, normalized) + name, cpf, gender, email, birthdate + client_id (WiFeed), host_type, local_name + created_at, updated_at + +sessions + id (PK) + mac_address (FK → users) + access_point_name, building_name, band, channel + rssi, user_ip, bytes_up/down/total + online_time (TIMESTAMP), offline_time (TIMESTAMP) + active_time_ms + created_at + UNIQUE (mac_address, online_time) ← idempotência + +watermarks + source (PK) -- 'ruijie' | 'wifeed' + last_value -- epoch ms (Ruijie) | 'YYYY-MM-DD' (WiFeed) + last_run_at -- timestamp da última extração +``` + +--- + +## 🔒 Segurança + +- **Credenciais**: apenas no `.env` (nunca commitar) +- **DB em VM separada**: `docker-compose.prod.yml` só sobe ETL; `DB_HOST` aponta para IP externo +- **Watermarks**: evitam vazamento de dados históricos acidental +- **Idempotência**: constraint única impede duplicatas mesmo em retry + +--- + +## 🐛 Troubleshooting + +| Problema | Solução | +|---|---| +| `psycopg2` erro ao conectar | Verifique `DB_HOST`, `DB_PORT`, firewall | +| Token Ruijie 401 | Rode `get_access_token()` manual ou atualize `RUIJIE_ACCESS_TOKEN` | +| Watermark não avança | Verifique `onlineTime` dos registros (deve ser > watermark anterior) | +| MAC não normaliza | Logs Warn — verifique formato de entrada (Cisco vs colon) | +| Docker build lento | Use `--no-cache` se mudar dependências; senão cache funciona | --- ## 📌 Roadmap -* [x] Estrutura inicial do projeto -* [ ] Integração com APIs -* [ ] Implementação do transform (merge por MAC) -* [ ] Persistência no banco -* [ ] Logging e tratamento de erros -* [ ] Deploy automatizado (CI/CD) +- [x] Schema mínimo + watermarks +- [x] Extractor Ruijie com paginação + watermark +- [x] Extractor WiFeed (report/access) +- [x] Transform: normalize_mac + mapeamento campos +- [x] Load: upsert users + insert sessions idempotente +- [x] Docker + cron +- [x] CI/CD (push → deploy) +- [ ] Retry com backoff nas APIs +- [ ] Testes unitários (pytest) +- [ ] Métricas Prometheus --- +## 📝 Licença + +Proprietário — uso interno. + diff --git a/app/core/config.py b/app/core/config.py index 2a0ff7c..347256a 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -3,16 +3,23 @@ import os load_dotenv() -DB_HOST = os.getenv("DB_HOST", "localhost") -DB_PORT = int(os.getenv("DB_PORT", 5432)) +# PostgreSQL +DB_HOST = os.getenv("DB_HOST") +DB_PORT = int(os.getenv("DB_PORT")) DB_NAME = os.getenv("DB_NAME", "wifi_etl") DB_USER = os.getenv("DB_USER", "postgres") -DB_PASSWORD = os.getenv("DB_PASSWORD", "") +DB_PASSWORD = os.getenv("DB_PASSWORD") -RUIJIE_BASE_URL = os.getenv("RUIJIE_BASE_URL") -RUIJIE_API_KEY = os.getenv("RUIJIE_API_KEY") +# Ruijie +RUIJIE_BASE_URL = os.getenv("RUIJIE_BASE_URL", "https://cloud-eu.ruijienetworks.com") +RUIJIE_APPID = os.getenv("RUIJIE_APPID") +RUIJIE_SECRET = os.getenv("RUIJIE_SECRET") +RUIJIE_ACCESS_TOKEN = os.getenv("RUIJIE_ACCESS_TOKEN") +RUIJIE_GROUP_ID = os.getenv("RUIJIE_GROUP_ID") -WIFEED_BASE_URL = os.getenv("WIFEED_BASE_URL") -WIFEED_API_KEY = os.getenv("WIFEED_API_KEY") +# WiFeed (pendente) +WIFEED_BASE_URL = os.getenv("WIFEED_BASE_URL", "") +WIFEED_CLIENT_ID = os.getenv("WIFEED_CLIENT_ID", "") +WIFEED_CLIENT_SECRET = os.getenv("WIFEED_CLIENT_SECRET", "") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") \ No newline at end of file diff --git a/app/core/db.py b/app/core/db.py index e69de29..6670d7e 100644 --- a/app/core/db.py +++ b/app/core/db.py @@ -0,0 +1,95 @@ +import psycopg2 +import psycopg2.extras +import logging +from typing import Optional, Dict, Any +from contextlib import contextmanager + +from app.core.config import ( + DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD +) + +logger = logging.getLogger(__name__) + + +@contextmanager +def get_connection(): + """ + Context manager para conexão PostgreSQL. + Garante fechamento automático e rollback em caso de erro. + """ + conn = None + try: + conn = psycopg2.connect( + host=DB_HOST, + port=DB_PORT, + dbname=DB_NAME, + user=DB_USER, + password=DB_PASSWORD, + connect_timeout=10 + ) + conn.autocommit = False + logger.debug("Conexão PostgreSQL estabelecida") + yield conn + conn.commit() + logger.debug("Transação commitada") + except psycopg2.Error as e: + if conn: + conn.rollback() + logger.error(f"Erro PostgreSQL: {e}") + raise + finally: + if conn: + conn.close() + logger.debug("Conexão PostgreSQL fechada") + + +def execute_query(conn, query: str, params: tuple = None, fetch: bool = False): + """ + Executa uma query SQL e opcionalmente retorna resultados. + + Args: + conn: Conexão psycopg2 + query: SQL query + params: Parâmetros para query parametrizada + fetch: Se True, retorna cursor com resultados + + Returns: + cursor (se fetch=True) ou None + """ + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(query, params) + if fetch: + return cur.fetchall() + return None + + +def execute_many(conn, query: str, params_list: list): + """ + Executa múltiplas inserts/updates em batch. + + Args: + conn: Conexão psycopg2 + query: SQL query com placeholders + params_list: Lista de tuplas de parâmetros + """ + with conn.cursor() as cur: + psycopg2.extras.execute_batch(cur, query, params_list, page_size=1000) + + +def test_connection() -> bool: + """ + Testa conexão com o banco. + + Returns: + True se conectado com sucesso + """ + try: + with get_connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + result = cur.fetchone() + logger.info(f"Conexão PostgreSQL OK — versão: {conn.server_version}") + return True + except Exception as e: + logger.error(f"Falha no teste de conexão: {e}") + return False diff --git a/app/extractor/ruijie.py b/app/extractor/ruijie.py index e69de29..32e8d58 100644 --- a/app/extractor/ruijie.py +++ b/app/extractor/ruijie.py @@ -0,0 +1,109 @@ +import requests +import logging +from typing import Tuple, Optional + +from app.core.config import ( + RUIJIE_BASE_URL, + RUIJIE_APPID, + RUIJIE_SECRET, + RUIJIE_ACCESS_TOKEN, + RUIJIE_GROUP_ID, +) + +logger = logging.getLogger(__name__) + +BASE_URL = RUIJIE_BASE_URL + + +def get_access_token() -> str: + url = f"{BASE_URL}/service/api/oauth20/client/access_token?token=d63dss0a81e4415a889ac5b78fsc904a" + payload = {"appid": RUIJIE_APPID, "secret": RUIJIE_SECRET} + resp = requests.post(url, json=payload, timeout=15) + resp.raise_for_status() + token = resp.json().get("data", {}).get("access_token") + if not token: + raise ValueError(f"Token não retornado: {resp.json()}") + return token + + +def refresh_token(access_token: str) -> str: + url = f"{BASE_URL}/service/api/token/refresh?appid={RUIJIE_APPID}&secret={RUIJIE_SECRET}&access_token={access_token}" + resp = requests.get(url, timeout=15) + resp.raise_for_status() + token = resp.json().get("accessToken") or resp.json().get("data", {}).get("access_token") + if not token: + raise ValueError(f"Refresh falhou: {resp.json()}") + return token + + +def get_active_users(access_token: str, page_index: int = 1, page_size: int = 100) -> list[dict]: + url = f"{BASE_URL}/logbizagent/logbiz/api/sta/sta_users" + payload = { + "groupId": int(RUIJIE_GROUP_ID), + "pageSize": page_size, + "pageIndex": page_index, + "staType": "onofflineUserHistory", + } + headers = {"Content-Type": "application/json"} + params = {"access_token": access_token} + + resp = requests.post(url, json=payload, headers=headers, params=params, timeout=15) + resp.raise_for_status() + return resp.json().get("list", []) + + +def extract_all_users( + access_token: str, + watermark_ms: Optional[int] = None, + page_size: int = 100 +) -> Tuple[list[dict], int]: + """ + Extrai sessões Ruijie com suporte a watermark. + + Args: + access_token: Token Ruijie + watermark_ms: onlineTime (epoch ms) da última sessão processada. + Se fornecido, ignora registros com onlineTime <= watermark. + page_size: Tamanho da página + + Returns: + (lista_de_registros, novo_watermark) + - novo_watermark = maior onlineTime encontrado (ou watermark anterior se nada novo) + """ + all_records = [] + page = 1 + max_online_ms = watermark_ms or 0 + stopped_early = False + + while True: + records = get_active_users(access_token, page_index=page, page_size=page_size) + if not records: + break + + # Filtra por watermark (se fornecido) ou adiciona tudo + if watermark_ms is not None: + filtered = [r for r in records if r.get('onlineTime', 0) > watermark_ms] + if len(filtered) < len(records): + # Parou de voltar no tempo — já viu tudo até o watermark + stopped_early = True + all_records.extend(filtered) + break + all_records.extend(filtered) + else: + # Sem watermark: adiciona todos os registros da página + all_records.extend(records) + + # Atualiza max_online_ms + page_max = max((r.get('onlineTime', 0) for r in records), default=0) + if page_max > max_online_ms: + max_online_ms = page_max + + logger.info(f"Ruijie: página {page} → {len(records)} registros (watermark={watermark_ms})") + + if len(records) < page_size or stopped_early: + break + page += 1 + + new_watermark = max_online_ms if max_online_ms > (watermark_ms or 0) else watermark_ms + logger.info(f"Ruijie: total extraído = {len(all_records)} registros (novo watermark={new_watermark})") + return all_records, new_watermark diff --git a/app/extractor/wifeed.py b/app/extractor/wifeed.py index e69de29..a44929d 100644 --- a/app/extractor/wifeed.py +++ b/app/extractor/wifeed.py @@ -0,0 +1,135 @@ +import requests +import logging +import json +from typing import Dict, Any, Optional, Tuple, List +from datetime import date + +# Suprimir warning SSL para requests com verify=False +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +from app.core.config import WIFEED_BASE_URL + +logger = logging.getLogger(__name__) + +BASE_URL = WIFEED_BASE_URL.rstrip('/') if WIFEED_BASE_URL else "" + + +class WiFeedIPBlockedError(Exception): + """Exceção quando WiFeed bloqueia o IP (CrowdSec).""" + pass + + +def get_access_token(client_id: str, client_secret: str) -> str: + """ + Autentica na API WiFeed e retorna o token de acesso. + Token válido por 24 horas. + Baseado em: https://api.wifeed.com.br/auth/api/login + """ + if not client_id or not client_secret: + raise ValueError("WiFeed: clientId e clientSecret são obrigatórios") + + if not BASE_URL: + raise ValueError("WiFeed: WIFEED_BASE_URL não está configurado") + + url = f"{BASE_URL}/auth/api/login" + payload = {"clientId": client_id, "clientSecret": client_secret} + headers = {"Content-Type": "application/json"} + + logger.info(f"WiFeed: autenticando em {url}") + + try: + resp = requests.post(url, json=payload, headers=headers, timeout=15, verify=False) + + # Detecta CrowdSec Ban (bloqueio de IP pela WAF) + if resp.status_code == 403 and "CrowdSec" in resp.text: + logger.error(f"WiFeed: ⛔ IP BLOQUEADO por CrowdSec (WAF da WiFeed)") + logger.error(f"WiFeed: Seu IP foi marcado como suspeito/bloqueado.") + logger.error(f"WiFeed: ➜ Entre em contato com suporte WiFeed (support@wifeed.com.br) para desbloquear") + raise WiFeedIPBlockedError("IP bloqueado pela WiFeed (CrowdSec). Contacte suporte: support@wifeed.com.br") + + resp.raise_for_status() + except WiFeedIPBlockedError: + raise + except requests.exceptions.RequestException as e: + logger.error(f"WiFeed: Erro na requisição de login: {e}") + if hasattr(e, 'response') and e.response is not None: + logger.error(f"WiFeed: Status code: {e.response.status_code}") + logger.error(f"WiFeed: Response body (primeiros 300 chars): {e.response.text[:300]}") + raise + + try: + data = resp.json() + except json.JSONDecodeError: + logger.error(f"WiFeed: Resposta não-JSON. Status: {resp.status_code}") + logger.error(f"WiFeed: Body (primeiros 300 chars): {resp.text[:300]}") + raise ValueError(f"WiFeed: API retornou resposta inválida (não-JSON)") + + # Tenta encontrar o token em diferentes campos possíveis + # WiFeed retorna em data.response.token (estrutura aninhada) + token = ( + data.get("response", {}).get("token") or # Estrutura atual WiFeed + data.get("token") or # Fallback direto + data.get("access_token") or # Fallback alternativo + data.get("Authorization") # Fallback último + ) + + if not token: + logger.error(f"WiFeed: Token não encontrado. Resposta completa: {json.dumps(data, indent=2, default=str)}") + raise ValueError(f"WiFeed: Token não retornado pela API. Chaves: {list(data.keys())}") + + # Remove prefixo "Bearer" se existir + if isinstance(token, str) and token.startswith("Bearer "): + token = token[7:] + + logger.info(f"WiFeed: autenticação bem-sucedida, token vigente por 24h") + return token + + +def extract_all_access( + access_token: str, + watermark_date: Optional[date] = None +) -> Tuple[List[Dict], str]: + """ + Extrai registros de acessos (clientes conectados) do WiFeed. + + Endpoint: GET /core/openapi/v2/report/access?date=YYYY-MM-DD + Retorna lista completa de acessos para a data especificada. + + Args: + access_token: Token Bearer de autenticação + watermark_date: Data a extrair (padrão: hoje) + + Returns: + (lista_de_registros, watermark_date_str) + """ + target_date = watermark_date or date.today() + url = f"{BASE_URL}/core/openapi/v2/report/access" + + logger.info(f"WiFeed: extração de acessos para {target_date.strftime('%Y-%m-%d')}") + + try: + params = {"date": target_date.strftime("%Y-%m-%d")} + headers = {"Authorization": f"Bearer {access_token}"} + + resp = requests.get(url, headers=headers, params=params, timeout=30, verify=False) + resp.raise_for_status() + + records = resp.json() + + # Valida que é uma lista + if not isinstance(records, list): + logger.warning(f"WiFeed: resposta não é lista, tentando extrair 'data' field") + if isinstance(records, dict) and "data" in records: + records = records["data"] + else: + logger.error(f"WiFeed: estrutura inesperada: {type(records)}") + records = [] + + logger.info(f"WiFeed: {len(records)} acessos extraídos para {target_date.strftime('%Y-%m-%d')}") + return records, target_date.strftime("%Y-%m-%d") + + except Exception as e: + logger.error(f"WiFeed: erro durante extração: {e}", exc_info=True) + raise + diff --git a/app/load/load_database.py b/app/load/load_database.py index e69de29..e3230fd 100644 --- a/app/load/load_database.py +++ b/app/load/load_database.py @@ -0,0 +1,83 @@ +import psycopg2 +import logging +from datetime import datetime, timezone +from typing import Optional + +logger = logging.getLogger(__name__) + +INSERT_USER = """ + INSERT INTO users (mac_address, name, cpf, gender, email, birthdate, + phone, client_id, host_type, local_name) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (mac_address) DO UPDATE SET + name = EXCLUDED.name, + cpf = EXCLUDED.cpf, + gender = EXCLUDED.gender, + email = EXCLUDED.email, + birthdate = EXCLUDED.birthdate, + phone = EXCLUDED.phone, + client_id = EXCLUDED.client_id, + host_type = EXCLUDED.host_type, + local_name = EXCLUDED.local_name, + updated_at = NOW(); +""" + +INSERT_SESSION = """ + INSERT INTO sessions ( + mac_address, access_point_name, building_name, + band, channel, rssi, user_ip, + bytes_up, bytes_down, bytes_total, serial_number, + online_time, offline_time, active_time_ms + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (mac_address, online_time) DO NOTHING; +""" + + +def epoch_to_utc(ms: Optional[int]) -> Optional[datetime]: + return datetime.fromtimestamp(ms / 1000, tz=timezone.utc) if ms else None + + +def load(conn, users: list[dict], sessions: list[dict]): + cur = conn.cursor() + + # Users upsert + if users: + user_params = [ + ( + u['mac_address'], u.get('name'), u.get('cpf'), u.get('gender'), + u.get('email'), u.get('birthdate'), u.get('phone'), + u.get('client_id'), u.get('host_type'), u.get('local_name') + ) for u in users + ] + cur.executemany(INSERT_USER, user_params) + logger.info(f"Users upsert: {cur.rowcount} rows") + else: + logger.info(f"Users upsert: 0 rows (nenhum usuário para processar)") + + # Sessions insert (converte epoch → datetime) + if sessions: + session_params = [ + ( + s['mac_address'], + s['access_point_name'], + s['building_name'], + s['band'], + s['channel'], + s['rssi'], + s['user_ip'], + s['bytes_up'], + s['bytes_down'], + s['bytes_total'], + s['serial_number'], + epoch_to_utc(s['online_time_ms']), + epoch_to_utc(s['offline_time_ms']), + s['active_time_ms'] + ) for s in sessions + ] + cur.executemany(INSERT_SESSION, session_params) + logger.info(f"Sessions insert: {cur.rowcount} rows") + else: + logger.info(f"Sessions insert: 0 rows (nenhuma sessão para processar)") + + conn.commit() + cur.close() diff --git a/app/main.py b/app/main.py index e69de29..e5bf447 100644 --- a/app/main.py +++ b/app/main.py @@ -0,0 +1,144 @@ +import logging, sys +from datetime import date, timedelta, datetime, timezone +from typing import Optional, Tuple + +import psycopg2 +from psycopg2.extras import DictCursor + +from app.core.config import ( + RUIJIE_ACCESS_TOKEN, LOG_LEVEL, + WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET, + DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD +) +from app.extractor.ruijie import refresh_token, extract_all_users +from app.extractor.wifeed import get_access_token as wf_token, extract_all_access, WiFeedIPBlockedError +from app.transform.merge_mac import transform_ruijie, transform_wifeed +from app.load.load_database import load + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL, "INFO"), + format="%(asctime)s %(levelname)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] +) +log = logging.getLogger(__name__) + + +# ---------- Watermark helpers ---------- +def get_watermark(conn, source: str) -> Tuple[Optional[str], Optional[datetime]]: + """ + Retorna (last_value, last_run_at) da tabela watermarks. + Se não existir, retorna (None, None). + """ + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute("SELECT last_value, last_run_at FROM watermarks WHERE source = %s", (source,)) + row = cur.fetchone() + if row: + return row['last_value'], row['last_run_at'] + return None, None + + +def set_watermark(conn, source: str, value: str): + """ + Insere ou atualiza watermark para a fonte. + """ + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO watermarks (source, last_value, last_run_at) + VALUES (%s, %s, NOW()) + ON CONFLICT (source) DO UPDATE SET + last_value = EXCLUDED.last_value, + last_run_at = EXCLUDED.last_run_at; + """, (source, value)) + log.info(f"Watermark [{source}] = {value}") + + +def should_use_watermark(last_run_at: Optional[datetime], max_age_hours: int = 1) -> bool: + """ + Porto seguro: se última execução foi há menos de max_age_hours, + USAMOS o watermark (evita re-processar mesmos dados em retry/recorrência). + Se há mais de max_age_hours, IGNORA watermark (força nova extração). + """ + if not last_run_at: + return False + age = datetime.now(timezone.utc) - last_run_at + return age < timedelta(hours=max_age_hours) + + +# ---------- Main ---------- +def run(): + log.info("=== ETL START ===") + + conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD) + conn.autocommit = False + + try: + # === Ruijie === + log.info("--- Ruijie ---") + token_r = refresh_token(RUIJIE_ACCESS_TOKEN) if RUIJIE_ACCESS_TOKEN else __import__('app.extractor.ruijie', fromlist=['get_access_token']).get_access_token() + + # Watermark Ruijie (epoch ms como string) + wm_val_str, last_run = get_watermark(conn, 'ruijie') + use_wm = should_use_watermark(last_run, max_age_hours=1) + wm_ms = int(wm_val_str) if use_wm and wm_val_str else None + + raw_r, new_wm_ms = extract_all_users(token_r, watermark_ms=wm_ms) + log.info(f"Ruijie: raw_r tem {len(raw_r)} registros extraídos") + sessions = [transform_ruijie(r) for r in raw_r] # Sem filtro if + log.info(f"Ruijie: {len(sessions)} sessions transformadas") + log.info(f"Ruijie: {len(sessions)} sessions (wm={'used' if use_wm else 'ignored'})") + + # === WiFeed === + log.info("--- WiFeed ---") + token_w = None + try: + token_w = wf_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET) + except WiFeedIPBlockedError as e: + log.error(f"⛔ WiFeed desabilitado: {e}") + log.error("⏳ Aguarde desbloqueio do IP e tente novamente amanhã") + token_w = None + users = [] + new_wm_date = date.today().strftime("%Y-%m-%d") + + if token_w: + wm_date_str, last_run_w = get_watermark(conn, 'wifeed') + use_wm_w = should_use_watermark(last_run_w, max_age_hours=1) + wm_date = date.fromisoformat(wm_date_str) if use_wm_w and wm_date_str else None + + raw_w, new_wm_date = extract_all_access(token_w, watermark_date=wm_date) + log.info(f"WiFeed: raw_w tem {len(raw_w)} registros extraídos") + users = [transform_wifeed(r) for r in raw_w] # Sem filtro if + log.info(f"WiFeed: {len(users)} users transformados") + log.info(f"WiFeed: {len(users)} users (wm={'used' if use_wm_w else 'ignored'})") + else: + log.info("WiFeed: pulando (IP bloqueado ou não autenticado)") + + # === Load === + load(conn, users, sessions) + log.info("Load OK") + + # === Update watermarks (apenas se avançou) === + # Ruijie: new_wm_ms pode ser None se não teve registros novos + if new_wm_ms is not None: + set_watermark(conn, 'ruijie', str(new_wm_ms)) + else: + log.info("Ruijie: sem registros novos, watermark mantido") + + # WiFeed: atualiza apenas se conseguiu autenticar + if token_w: + set_watermark(conn, 'wifeed', new_wm_date) + else: + log.info("WiFeed: watermark não atualizado (autenticação falhou)") + + conn.commit() + log.info("=== ETL DONE ===") + + except Exception as e: + conn.rollback() + log.error(f"ETL failed: {e}", exc_info=True) + raise + finally: + conn.close() + + +if __name__ == "__main__": + run() diff --git a/app/transform/merge_mac.py b/app/transform/merge_mac.py index e69de29..ad70a8c 100644 --- a/app/transform/merge_mac.py +++ b/app/transform/merge_mac.py @@ -0,0 +1,63 @@ +import logging +logger = logging.getLogger(__name__) + + +def normalize_mac(mac: str) -> str: + if not mac: + return "" + cleaned = mac.replace('.', '').replace('-', '').replace(':', '').upper() + if len(cleaned) == 12 and all(c in '0123456789ABCDEF' for c in cleaned): + return ':'.join(cleaned[i:i+2] for i in range(0, 12, 2)) + return mac.upper() + + +def transform_ruijie(record: dict) -> dict: + """Session Ruijie → dict (mac normalizado + campos).""" + return { + 'mac_address': normalize_mac(record.get('mac', '')), + 'access_point_name': record.get('deviceAliasName', ''), + 'building_name': record.get('buildingName', ''), + 'band': record.get('band', ''), + 'channel': record.get('channel', ''), + 'rssi': int(record.get('rssiInt', 0) or 0), + 'user_ip': record.get('userIp', ''), + 'bytes_up': int(record.get('wifiUp', 0) or 0), + 'bytes_down': int(record.get('wifiDown', 0) or 0), + 'bytes_total': int(record.get('wifiUpDown', 0) or 0), + 'serial_number': record.get('sn', ''), + 'online_time_ms': record.get('onlineTime'), # epoch ms (int) + 'offline_time_ms': record.get('offlineTime'), # epoch ms (int) + 'active_time_ms': int(record.get('activeTime', 0) or 0), + } + + +def transform_wifeed(record: dict) -> dict: + """ + Access WiFeed → dict (usuário + MAC do dispositivo). + + Dados vêm de /v2/report/access e incluem: + - hostMacAddress: MAC do dispositivo + - clientName, clientEmail, clientPhoneNumber: Info do cliente + - clientGender, clientBirthdate: Dados demográficos + - clientExtraFields: CPF e outros extras + - clientId: ID único do cliente + - hostType: Tipo de dispositivo (Android, iPhone, etc) + - localName: Local/network onde conectou + """ + cpf = '' + extra = record.get('clientExtraFields') + if isinstance(extra, dict): + cpf = extra.get('CPF', '') + + return { + 'mac_address': normalize_mac(record.get('hostMacAddress', '')), + 'name': record.get('clientName', ''), + 'email': record.get('clientEmail', ''), + 'cpf': cpf, + 'gender': record.get('clientGender', ''), + 'birthdate': record.get('clientBirthdate'), # string 'YYYY-MM-DD' + 'phone': record.get('clientPhoneNumber', ''), + 'client_id': int(record.get('clientId', 0) or 0), + 'host_type': record.get('hostType', ''), + 'local_name': record.get('localName', ''), + } diff --git a/backfill.py b/backfill.py new file mode 100644 index 0000000..72a4bf9 --- /dev/null +++ b/backfill.py @@ -0,0 +1,46 @@ +import time +from datetime import date, timedelta +import psycopg2 + +from app.extractor.wifeed import get_access_token, extract_all_access +from app.core.config import WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET +from app.core.config import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD +from app.transform.merge_mac import transform_wifeed +from app.load.load_database import load + +START_DATE = date(2026, 4, 21) +END_DATE = date.today() - timedelta(days=1) + +SLEEP_BETWEEN = 20 # 20s entre requisições = 3 req/min, bem abaixo do limite + +conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD) +conn.autocommit = False + +token = get_access_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET) +time.sleep(SLEEP_BETWEEN) # pausa já após o login + +current = START_DATE +while current <= END_DATE: + print(f"Processando {current}...") + try: + raw, _ = extract_all_access(token, watermark_date=current) + users = [transform_wifeed(r) for r in raw if transform_wifeed(r)] + if users: + load(conn, users, []) # sessions vazio no backfill + conn.commit() + print(f" {len(users)} usuários inseridos") + else: + print(f" sem dados") + except Exception as e: + conn.rollback() + if "429" in str(e): + print(f" Rate limit — aguardando 90s...") + time.sleep(90) + else: + print(f" ERRO em {current}: {e}") + + current += timedelta(days=1) + time.sleep(SLEEP_BETWEEN) + +conn.close() +print("Backfill concluído!") \ No newline at end of file diff --git a/debug_wifeed.py b/debug_wifeed.py new file mode 100644 index 0000000..7401290 --- /dev/null +++ b/debug_wifeed.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +"""Debug WiFeed transform""" +import requests +import json +from app.core.config import WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET, WIFEED_BASE_URL +from app.extractor.wifeed import get_access_token +from app.transform.merge_mac import transform_wifeed + +print("Autenticando WiFeed...") +try: + token = get_access_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET) + print(f"Token obtido: {token[:50]}...") +except Exception as e: + print(f"Erro ao autenticar: {e}") + exit(1) + +print("\nExtraindo dados de hoje...") +url = f"{WIFEED_BASE_URL.rstrip('/')}/core/openapi/v1/report/record" +params = {"page": 0, "date": "2026-04-22"} +headers = {"Authorization": f"Bearer {token}"} + +try: + resp = requests.get(url, headers=headers, params=params, timeout=30, verify=False) + resp.raise_for_status() + records = resp.json() if isinstance(resp.json(), list) else resp.json().get("records", []) + print(f"Extractos {len(records)} registros") +except Exception as e: + print(f"Erro ao extrair: {e}") + exit(1) + +if records: + print("\n=" * 80) + print("PRIMEIRO REGISTRO BRUTO:") + print("=" * 80) + print(json.dumps(records[0], indent=2)) + + print("\n" + "=" * 80) + print("APLICANDO TRANSFORM:") + print("=" * 80) + transformed = transform_wifeed(records[0]) + print(json.dumps(transformed, indent=2, default=str)) + + if transformed.get('mac_address'): + print(f"\n✓ MAC extraído: {transformed['mac_address']}") + else: + print(f"\n✗ MAC VAZIO - problema na extração!") + + if transformed.get('name'): + print(f"✓ Name extraído: {transformed['name']}") + else: + print(f"✗ Name VAZIO - problema na extração!") +else: + print("Nenhum registro extraído!") diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..9ed17ab --- /dev/null +++ b/deploy.sh @@ -0,0 +1,88 @@ +#!/bin/bash +# WiFi ETL — Deploy Script (Production) +# Uso: ./deploy.sh [prod|staging] +# Exemplo: ./deploy.sh prod + +set -e + +ENV_FILE=".env" +IMAGE_NAME="wifi-etl" +CONTAINER_NAME="wifi-etl-worker" +NETWORK_NAME="wifi-etl-net" + +# Carrega .env +if [ -f "$ENV_FILE" ]; then + export $(grep -v '^#' "$ENV_FILE" | xargs) +else + echo "❌ Arquivo $ENV_FILE não encontrado." + exit 1 +fi + +TARGET=${1:-prod} + +if [ "$TARGET" != "prod" ] && [ "$TARGET" != "staging" ]; then + echo "Uso: $0 [prod|staging]" + exit 1 +fi + +echo "=== Deploy para $TARGET ===" + +# --- 1) Build local --- +echo "→ Buildando imagem Docker..." +docker build -f infra/Dockerfile -t ${IMAGE_NAME}:latest . + +# --- 2) Push para registry (opcional) --- +if [ -n "$REGISTRY" ]; then + echo "→ Tagging & push para $REGISTRY..." + docker tag ${IMAGE_NAME}:latest ${REGISTRY}/${IMAGE_NAME}:latest + docker push ${REGISTRY}/${IMAGE_NAME}:latest +fi + +# --- 3) Envia imagem ao servidor --- +echo "→ Enviando imagem ao servidor $SSH_HOST..." +docker save ${IMAGE_NAME}:latest -o /tmp/wifi-etl-image.tar + +scp -i "$SSH_PRIVATE_KEY" \ + -o StrictHostKeyChecking=no \ + /tmp/wifi-etl-image.tar ${SSH_USER}@${SSH_HOST}:/tmp/ + +# --- 4) Deploy remoto --- +ssh -i "$SSH_PRIVATE_KEY" \ + -o StrictHostKeyChecking=no \ + ${SSH_USER}@${SSH_HOST} << EOF + set -e + echo " → Carregando imagem no servidor..." + docker load -i /tmp/wifi-etl-image.tar + + echo " → Criando network..." + docker network create ${NETWORK_NAME} 2>/dev/null || true + + echo " → Parando container anterior..." + docker stop ${CONTAINER_NAME} 2>/dev/null || true + docker rm ${CONTAINER_NAME} 2>/dev/null || true + + echo " → Iniciando novo container..." + docker run -d \ + --name ${CONTAINER_NAME} \ + --network ${NETWORK_NAME} \ + -e DB_HOST="${DB_HOST}" \ + -e DB_PORT="${DB_PORT:-5432}" \ + -e DB_NAME="${DB_NAME:-wifi_etl}" \ + -e DB_USER="${DB_USER:-postgres}" \ + -e DB_PASSWORD="${DB_PASSWORD}" \ + -e RUIJIE_BASE_URL="${RUIJIE_BASE_URL}" \ + -e RUIJIE_APPID="${RUIJIE_APPID}" \ + -e RUIJIE_SECRET="${RUIJIE_SECRET}" \ + -e RUIJIE_ACCESS_TOKEN="${RUIJIE_ACCESS_TOKEN}" \ + -e RUIJIE_GROUP_ID="${RUIJIE_GROUP_ID:-9290679}" \ + -e WIFEED_BASE_URL="${WIFEED_BASE_URL}" \ + -e WIFEED_CLIENT_ID="${WIFEED_CLIENT_ID}" \ + -e WIFEED_CLIENT_SECRET="${WIFEED_CLIENT_SECRET}" \ + -e LOG_LEVEL="${LOG_LEVEL:-INFO}" \ + ${IMAGE_NAME}:latest + + echo "✅ Deploy OK — Container: \$(docker ps -qf name=${CONTAINER_NAME})" +EOF + +rm -f /tmp/wifi-etl-image.tar +echo "✅ Deploy concluído em $TARGET" diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml new file mode 100644 index 0000000..31a22d9 --- /dev/null +++ b/docker-compose.prod.yml @@ -0,0 +1,36 @@ +version: '3.8' + +services: + etl: + image: wifi-etl:latest # imagem construída pelo CI/CD + container_name: wifi-etl-worker + restart: unless-stopped + environment: + # PostgreSQL (VM separada) + DB_HOST: ${DB_HOST} + DB_PORT: ${DB_PORT:-5432} + DB_NAME: ${DB_NAME:-wifi_etl} + DB_USER: ${DB_USER:-postgres} + DB_PASSWORD: ${DB_PASSWORD} + + # Ruijie + RUIJIE_BASE_URL: ${RUIJIE_BASE_URL} + RUIJIE_APPID: ${RUIJIE_APPID} + RUIJIE_SECRET: ${RUIJIE_SECRET} + RUIJIE_ACCESS_TOKEN: ${RUIJIE_ACCESS_TOKEN} + RUIJIE_GROUP_ID: ${RUIJIE_GROUP_ID:-9290679} + + # WiFeed + WIFEED_BASE_URL: ${WIFEED_BASE_URL} + WIFEED_CLIENT_ID: ${WIFEED_CLIENT_ID} + WIFEED_CLIENT_SECRET: ${WIFEED_CLIENT_SECRET} + + # ETL + LOG_LEVEL: ${LOG_LEVEL:-INFO} + networks: + - wifi-etl-net + # Entrypoint já definido no Dockerfile (executa ETL uma vez + cron -f) + +networks: + wifi-etl-net: + driver: bridge diff --git a/docs/DEPLOY.md b/docs/DEPLOY.md new file mode 100644 index 0000000..2b6e926 --- /dev/null +++ b/docs/DEPLOY.md @@ -0,0 +1,213 @@ +# Deploy WiFi-ETL em Produção + +## Arquitetura + +- **Aplicação**: `/opt/wifi-etl` (clone do repositório) +- **Config**: `.env` copiado de `/home/desenvolvimento/.envs/wifi_etl/.env` +- **Container**: Rodando via `docker compose` (cron + ETL) +- **Runner**: Self-hosted na VM (pull automático na mudança de código) + +## Setup Inicial (uma única vez na VM) + +### 1. Configurar Self-Hosted Runner no Gitea + +Na **VM de produção**: + +```bash +# Criar diretório para o runner +mkdir -p ~/gitea-runner +cd ~/gitea-runner + +# Baixar runner Gitea (Linux x86_64) +# Substitua URL pela sua instância Gitea +wget https://seu-gitea.com/api/v1/repos/seu-usuario/wifi-etl/actions/runners/download/linux_x86_64 + +# Ou use curl: +# curl -LO https://seu-gitea.com/api/v1/repos/seu-usuario/wifi-etl/actions/runners/download/linux_x86_64 + +# Descompactar +unzip linux_x86_64 +chmod +x act_runner + +# Gerar token no Gitea: +# 1. Vá para: Repositório → Settings → Actions → Runners +# 2. Clique "Create Runner" +# 3. Copie o Token gerado + +# Registrar runner (substitua o token) +./act_runner register \ + --instance https://seu-gitea.com \ + --token seu_token_aqui + +# Rodar em background +nohup ./act_runner daemon > runner.log 2>&1 & + +# OU criar systemd service (recomendado): +sudo tee /etc/systemd/system/gitea-runner.service > /dev/null << 'EOF' +[Unit] +Description=Gitea Runner +After=network.target + +[Service] +Type=simple +User=ubuntu +WorkingDirectory=/home/ubuntu/gitea-runner +ExecStart=/home/ubuntu/gitea-runner/act_runner daemon +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target +EOF + +sudo systemctl enable gitea-runner +sudo systemctl start gitea-runner +sudo systemctl status gitea-runner +``` + +### 2. Preparar diretório da aplicação + +```bash +# Criar diretório +mkdir -p /opt/wifi-etl + +# Primeiro clone manual (depois é automático) +cd /opt/wifi-etl +git clone https://seu-gitea.com/seu-usuario/wifi-etl.git . +``` + +## Fluxo de Deploy Automático + +``` +1. git push main (local → Gitea) + ↓ +2. Gitea dispara workflow (push event) + ↓ +3. Runner na VM executa + ↓ +4. git pull /opt/wifi-etl (atualiza código) + ↓ +5. Copiar .env + ↓ +6. docker compose up -d --build + ↓ +7. Validar CRON status + ↓ +8. Testar conexão BD + ↓ +DONE! ✓ +``` + +## Deploy Manual (se necessário) + +```bash +# Na VM +cd /opt/wifi-etl +git pull origin main +cp /home/desenvolvimento/.envs/wifi_etl/.env ./.env +docker compose -f infra/docker-compose.yml up -d --build + +# Aguardar inicialização +sleep 5 +docker logs -f wifi_etl_worker +``` + +## Verificações Pós-Deploy + +```bash +# Status do container +docker ps -a | grep wifi_etl + +# Logs em tempo real +docker logs -f wifi_etl_worker + +# Validar cron rodando +docker exec wifi_etl_worker ps aux | grep cron + +# Testar banco de dados +docker exec wifi_etl_worker python3 << 'EOF' +import psycopg2, os +try: + conn = psycopg2.connect( + host=os.getenv('DB_HOST'), + port=int(os.getenv('DB_PORT', 5432)), + dbname=os.getenv('DB_NAME'), + user=os.getenv('DB_USER'), + password=os.getenv('DB_PASSWORD') + ) + print("✓ Database conectado") + conn.close() +except Exception as e: + print(f"✗ Erro: {e}") +EOF + +# Ver logs de ETL +docker exec wifi_etl_worker tail -50 /var/log/wifi-etl.log +``` + +## Troubleshooting + +### Container não inicia +```bash +docker logs wifi_etl_worker +docker exec wifi_etl_worker env | grep DB_ +``` + +### Cron não funciona +```bash +docker exec -it wifi_etl_worker bash +cat /etc/cron.d/wifi-etl # Verificar arquivo +tail -20 /var/log/syslog # Verificar logs +``` + +### Banco não conecta +```bash +# Verificar variáveis de ambiente +docker exec wifi_etl_worker env | grep DB_ + +# Testar conexão +docker exec wifi_etl_worker python3 -m \ + psycopg2 "postgresql://user:pass@host:port/db" +``` + +### Runner não pegando mudanças +```bash +cd ~/gitea-runner +./act_runner status + +# Ver logs +tail -50 runner.log + +# Restart +pkill act_runner +nohup ./act_runner daemon > runner.log 2>&1 & +``` + +## Revertir Deploy + +```bash +# Parar container +docker-compose -f /opt/wifi-etl/infra/docker-compose.yml down + +# Remover para forçar rebuild +docker rmi wifi-etl:latest + +# Ou voltar para commit anterior +cd /opt/wifi-etl +git reset --hard HEAD~1 +# Depois fazer deploy novamente +``` + +## ETL Schedule + +Roda a cada **5 minutos** em produção: + +``` +*/5 * * * * /usr/local/bin/python /app/main.py +``` + +Logs: `/var/log/wifi-etl.log` (dentro do container) + +--- + +**Last updated**: 2026-04-22 diff --git a/docs/PROD_SETUP.md b/docs/PROD_SETUP.md new file mode 100644 index 0000000..4433ae3 --- /dev/null +++ b/docs/PROD_SETUP.md @@ -0,0 +1,196 @@ +# 🖥️ Setup do Servidor de Produção + +## 1. Sistema Operacional + +Ubuntu 22.04+ ou Debian 12 recomendados. + +```bash +# Atualiza +sudo apt update && sudo apt upgrade -y + +# Instala Docker +sudo apt install -y docker.io docker-compose-plugin + +# Habilita e inicia +sudo systemctl enable docker +sudo systemctl start docker + +# Adiciona usuário deploy ao grupo docker (se usar usuário não-root) +sudo usermod -aG docker $USER +# Faça logout/login ou: newgrp docker +``` + +--- + +## 2. Estrutura de Diretórios + +```bash +# Cria diretório da aplicação +sudo mkdir -p /opt/wifi-etl +sudo chown $USER:$USER /opt/wifi-etl # ou usuário deploy +cd /opt/wifi-etl +``` + +--- + +## 3. SSH Key (para CI/CD acessar) + +No **servidor de produção**, gere uma chave para o deploy (ou use existente): + +```bash +# Como usuário deploy (ou root) +ssh-keygen -t rsa -b 4096 -C "deploy@wifi-etl" -f ~/.ssh/id_rsa_wifi_etl -N "" + +# Exiba a chave pública (adicionar no Gitea como Deploy Key ou user key) +cat ~/.ssh/id_rsa_wifi_etl.pub +# Copie o conteúdo → Gitea Settings → Deploy Keys → Add Key +``` + +**No Gitea (repositório):** +- Settings → Deploy Keys → Add Deploy Key +- Cole a chave pública +- Marque "Allow write access" (para permitir deploy via SSH) + +--- + +## 4. Docker Network (opcional — o compose cria automáticamente) + +```bash +docker network create wifi-etl-net +``` + +--- + +## 5. Banco de Dados (VM separada ou mesmo servidor) + +Se o DB estiver na mesma VM (não recomendado p/ prod): + +```bash +docker run -d \ + --name wifi-db \ + -e POSTGRES_PASSWORD=$DB_PASSWORD \ + -e POSTGRES_DB=$DB_NAME \ + -p 5432:5432 \ + postgres:15-alpine +``` + +Se DB for externo: pule — apenas configure `DB_HOST` no `.env` do ETL. + +--- + +## 6. Variáveis de Ambiente no Servidor + +No servidor, crie `/opt/wifi-etl/.env`: + +```bash +cat > /opt/wifi-etl/.env << 'EOF' +DB_HOST=localhost # ou IP do DB externo +DB_PORT=5432 +DB_NAME=wifi_etl +DB_USER=postgres +DB_PASSWORD=sua_senha_aqui + +RUIJIE_BASE_URL=https://cloud-eu.ruijienetworks.com +RUIJIE_APPID=open52d4899cdbe2 +RUIJIE_SECRET=10493c81e8e94f56b8710d78ed2527c7 +RUIJIE_ACCESS_TOKEN= +RUIJIE_GROUP_ID=9290679 + +WIFEED_BASE_URL=https://api.wifeed.com.br +WIFEED_CLIENT_ID=60e40ee2-f39f-4556-8a22-840a2e3fa686 +WIFEED_CLIENT_SECRET=dRpd6FB2hjbyvcA + +LOG_LEVEL=INFO +EOF + +chmod 600 /opt/wifi-etl/.env +``` + +--- + +## 7. Deploy Inicial (manual) + +Após primeiro push na `main` (CI/CD automático), ou manual: + +```bash +cd /opt/wifi-etl + +# Copie arquivos do repositório (ou o CI/CD faz isso) +# Você precisará de: +# - docker-compose.prod.yml +# - infra/init.sql (schema) + +# 1) Crie schema no DB +# Se DB local: +docker exec -i wifi-db psql -U postgres -d wifi_etl < infra/init.sql + +# Se DB externo: +psql -h $DB_HOST -U $DB_USER -d $DB_NAME -f infra/init.sql + +# 2) Teste imagem local (antes do CI/CD) +docker build -f infra/Dockerfile -t wifi-etl:test . +docker run --rm \ + -e DB_HOST=... -e DB_PASSWORD=... \ + -e RUIJIE_APPID=... -e RUIJIE_SECRET=... \ + -e WIFEED_CLIENT_ID=... -e WIFEED_CLIENT_SECRET=... \ + wifi-etl:test + +# 3) Se OK, o CI/CD fará deploy automático no próximo push na main. +``` + +--- + +## 8. Monitoramento + +```bash +# Logs do container +docker logs wifi-etl-worker -f + +# Estatísticas +docker stats wifi-etl-worker + +# graceful restart +docker restart wifi-etl-worker +``` + +--- + +## 9. Backup do Banco + +```bash +# Diário via cron no servidor do DB +0 2 * * * pg_dump -U postgres wifi_etl > /backup/wifi_etl_$(date +\%Y-\%m-\%d).sql +``` + +--- + +## 10. Troubleshooting + +| Problema | Solução | +|---|---| +| `permission denied` ao conectar SSH | Adicione chave pública ao `~/.ssh/authorized_keys` do usuário deploy | +| Container sobe e para | Verifique logs: `docker logs wifi-etl-worker` — falta `.env` ou erro de conexão DB | +| DBconnection refused | Verifique `DB_HOST`, firewall, `pg_hba.conf` | +| Token Ruijie expirado | Delete `RUIJIE_ACCESS_TOKEN` do `.env` — ETL renova automaticamente | +| Watermark não avança | `onlineTime` deve ser > watermark anterior; verifique formato (epoch ms) | + +--- + +## 📋 Checklist Deploy + +- [ ] Docker instalado no servidor +- [ ] Usuário `deploy` criado (ou use root) +- [ ] SSH key pair gerado; **public key** adicionada ao Gitea Deploy Keys +- [ ] Diretório `/opt/wifi-etl` criado +- [ ] `.env` copiado para `/opt/wifi-etl/` (ou use variáveis do CI/CD) +- [ ] `infra/init.sql` executado no banco +- [ ] Firewall: porta 5432 (DB) acessível do servidor ETL +- [ ] Gitea Variables configuradas (DB, Ruijie, WiFeed) +- [ ] Gitea Secret `SSH_PRIVATE_KEY` configurado + +--- + +Após push em `main`, o workflow Gitea fará: +1. Build → artifact +2. SSH deploy → servidor +3. Container reiniciado diff --git a/infra/Dockerfile b/infra/Dockerfile index e69de29..c2cb6a8 100644 --- a/infra/Dockerfile +++ b/infra/Dockerfile @@ -0,0 +1,34 @@ +# WiFi ETL — Docker Image +FROM python:3.11-slim + +# System deps +RUN apt-get update && apt-get install -y --no-install-recommends \ + cron \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Workdir +WORKDIR /app + +# Python deps +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Code +COPY app ./app +COPY infra/init.sql /docker-entrypoint-initdb.d/ +COPY infra/entrypoint.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/entrypoint.sh + +# Crontab +COPY infra/crontab /etc/cron.d/wifi-etl +RUN chmod 0644 /etc/cron.d/wifi-etl && \ + crontab /etc/cron.d/wifi-etl + +# Log dir +RUN mkdir -p /var/log && touch /var/log/wifi-etl.log /var/log/wifi-etl-cleanup.log + +# Entrypoint executa ETL uma vez, depois inicia cron +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] +CMD ["cron", "-f"] diff --git a/infra/crontab b/infra/crontab index e69de29..5afbb63 100644 --- a/infra/crontab +++ b/infra/crontab @@ -0,0 +1,6 @@ +# WiFi ETL Pipeline — Cron Jobs +# Formato: MIN HOUR DOM MON DOW COMMAND +# Fuso horário: UTC (ajustar se necessário) + +# A cada 5 minutos: executa o ETL +*/5 * * * * /usr/local/bin/python -m app.main >> /var/log/wifi-etl.log 2>&1 diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml index e69de29..b09db3a 100644 --- a/infra/docker-compose.yml +++ b/infra/docker-compose.yml @@ -0,0 +1,41 @@ +version: '3.8' + +services: + postgres: + image: postgres:15-alpine + container_name: wifi_etl_db + environment: + POSTGRES_DB: wifi_etl + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + + etl: + build: + context: .. + dockerfile: infra/Dockerfile + container_name: wifi_etl_worker + env_file: + - ../.env + depends_on: + postgres: + condition: service_healthy + environment: + DB_HOST: postgres + volumes: + - etl_logs:/var/log + restart: unless-stopped + +volumes: + postgres_data: + etl_logs: diff --git a/infra/entrypoint.sh b/infra/entrypoint.sh new file mode 100644 index 0000000..b62a39d --- /dev/null +++ b/infra/entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +echo "=== WiFi ETL Container Starting ===" +echo "Executando primeira corrida ETL..." +python -m app.main || echo "ETL inicial falhou (ver logs). O cron continuará rodando." +echo "Iniciando cron..." +exec cron -f diff --git a/infra/init.sql b/infra/init.sql new file mode 100644 index 0000000..31d9387 --- /dev/null +++ b/infra/init.sql @@ -0,0 +1,116 @@ +-- WiFi ETL — Schema Mínimo + Watermarks +-- 3 tabelas: watermarks, users, sessions + +-- ----------------------------------------------------- +-- Tabela: watermarks (controle de última extração) +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS watermarks ( + source VARCHAR(50) PRIMARY KEY, -- 'ruijie', 'wifeed' + last_value TEXT, -- valor: epoch ms (Ruijie) ou 'YYYY-MM-DD' (WiFeed) + last_run_at TIMESTAMP WITH TIME ZONE -- quando a extração rodou +); + +-- ----------------------------------------------------- +-- Tabela: users (WiFeed) +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS users ( + id BIGSERIAL PRIMARY KEY, + mac_address VARCHAR(17) UNIQUE NOT NULL, + name VARCHAR(255), + cpf VARCHAR(14), + gender VARCHAR(20), + email VARCHAR(255), + birthdate DATE, + phone VARCHAR(20), + client_id INTEGER, + host_type VARCHAR(50), + local_name VARCHAR(255), + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); + +CREATE INDEX idx_users_mac ON users(mac_address); + +-- ----------------------------------------------------- +-- Tabela: sessions (Ruijie) +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS sessions ( + id BIGSERIAL PRIMARY KEY, + mac_address VARCHAR(17) NOT NULL, + access_point_name VARCHAR(255) NOT NULL, + building_name VARCHAR(255), + band VARCHAR(10), + channel VARCHAR(10), + rssi INTEGER, + user_ip INET, + bytes_up BIGINT, + bytes_down BIGINT, + bytes_total BIGINT, + serial_number VARCHAR(100), + online_time TIMESTAMP NOT NULL, + offline_time TIMESTAMP NOT NULL, + active_time_ms INTEGER, + created_at TIMESTAMP DEFAULT NOW(), + + CONSTRAINT uq_session UNIQUE (mac_address, online_time) +); + +CREATE INDEX idx_sessions_mac ON sessions(mac_address); +CREATE INDEX idx_sessions_online ON sessions(online_time DESC); + +-- ----------------------------------------------------- +-- Função simples: update_updated_at_colummn +-- ----------------------------------------------------- +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Triggers +CREATE TRIGGER update_users_updated_at + BEFORE UPDATE ON users + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); + +-- Views + +CREATE OR REPLACE VIEW vw_sessions_enriched AS +SELECT + -- Identificação + u.mac_address, + u.name, + u.cpf, + u.gender, + u.email, + u.birthdate, + u.phone, + u.host_type, + u.local_name, + + -- Sessão + s.id AS session_id, + s.access_point_name, + s.building_name, + s.band, + s.channel, + s.rssi, + s.user_ip, + s.online_time, + s.offline_time, + s.active_time_ms, + ROUND(s.active_time_ms / 1000.0) AS active_time_sec, + ROUND(s.active_time_ms / 60000.0, 1) AS active_time_min, + + -- Tráfego em KB + ROUND(s.bytes_up / 1024.0, 2) AS kb_up, + ROUND(s.bytes_down / 1024.0, 2) AS kb_down, + ROUND(s.bytes_total / 1024.0, 2) AS kb_total, + + s.serial_number, + s.created_at AS session_created_at + +FROM sessions s +LEFT JOIN users u ON u.mac_address = s.mac_address; \ No newline at end of file diff --git a/test_transform_load.py b/test_transform_load.py new file mode 100644 index 0000000..d0961b6 --- /dev/null +++ b/test_transform_load.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +""" +Script para testar transform + load com dados mockados Ruijie e WiFeed. +Não precisa de API externas, testa apenas a lógica de transformação e persistência. +""" + +import sys +import psycopg2 +from datetime import datetime, timezone + +from app.core.config import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD +from app.transform.merge_mac import transform_ruijie, transform_wifeed +from app.load.load_database import load + +print("=" * 80) +print("TEST: Transform + Load") +print("=" * 80) + +# Dados mock Ruijie (sessões) +mock_ruijie_records = [ + { + "mac": "AA:BB:CC:DD:EE:FF", + "deviceAliasName": "AP-01", + "buildingName": "Prédio A", + "band": "5GHz", + "channel": 36, + "rssiInt": -45, + "userIp": "192.168.1.100", + "wifiUp": 1000000, + "wifiDown": 5000000, + "wifiUpDown": 6000000, + "sn": "SN001", + "onlineTime": 1640000000000, # epoch ms + "offlineTime": 1640001000000, # epoch ms + "activeTime": 500000, + }, + { + "mac": "11:22:33:44:55:66", + "deviceAliasName": "AP-02", + "buildingName": "Prédio B", + "band": "2.4GHz", + "channel": 6, + "rssiInt": -60, + "userIp": "192.168.1.101", + "wifiUp": 2000000, + "wifiDown": 10000000, + "wifiUpDown": 12000000, + "sn": "SN002", + "onlineTime": 1640100000000, + "offlineTime": 1640101000000, + "activeTime": 600000, + } +] + +# Dados mock WiFeed (usuários) +mock_wifeed_records = [ + { + "hostMacAddress": "AA:BB:CC:DD:EE:FF", + "clientName": "João Silva", + "clientEmail": "joao@example.com", + "clientExtraFields": {"CPF": "12345678901"}, + "clientGender": "M", + "clientBirthdate": "1990-01-15", + "clientPhoneNumber": "1198765432", + "clientId": 1001, + "hostType": "mobile", + "localName": "João Mobile", + }, + { + "hostMacAddress": "11:22:33:44:55:66", + "clientName": "Maria Santos", + "clientEmail": "maria@example.com", + "clientExtraFields": {"CPF": "98765432101"}, + "clientGender": "F", + "clientBirthdate": "1985-05-20", + "clientPhoneNumber": "1187654321", + "clientId": 1002, + "hostType": "desktop", + "localName": "Maria Desktop", + } +] + +print("\n1️⃣ Transformando dados Ruijie...") +sessions = [] +for record in mock_ruijie_records: + transformed = transform_ruijie(record) + if transformed: + sessions.append(transformed) + print(f" ✓ MAC {transformed['mac_address']} → {len(sessions)} sessões") + +print(f"\n Total sessões: {len(sessions)}") +print(f" Exemplo: {sessions[0] if sessions else 'N/A'}") + +print("\n2️⃣ Transformando dados WiFeed...") +users = [] +for record in mock_wifeed_records: + transformed = transform_wifeed(record) + if transformed: + users.append(transformed) + print(f" ✓ MAC {transformed['mac_address']} → {len(users)} usuários") + +print(f"\n Total usuários: {len(users)}") +print(f" Exemplo: {users[0] if users else 'N/A'}") + +print("\n3️⃣ Conectando ao banco de dados...") +try: + conn = psycopg2.connect( + host=DB_HOST, + port=DB_PORT, + dbname=DB_NAME, + user=DB_USER, + password=DB_PASSWORD + ) + conn.autocommit = False + print(f" ✓ Conectado a {DB_HOST}:{DB_PORT}/{DB_NAME}") +except Exception as e: + print(f" ✗ Erro de conexão: {e}") + sys.exit(1) + +print("\n4️⃣ Carregando dados no banco (transform + load)...") +try: + load(conn, users, sessions) + print(" ✓ Load concluído com sucesso") +except Exception as e: + conn.rollback() + print(f" ✗ Erro no load: {e}") + import traceback + traceback.print_exc() + sys.exit(1) +finally: + conn.close() + +print("\n5️⃣ Verific ando dados no banco...") +try: + conn = psycopg2.connect( + host=DB_HOST, + port=DB_PORT, + dbname=DB_NAME, + user=DB_USER, + password=DB_PASSWORD + ) + cur = conn.cursor() + + cur.execute("SELECT COUNT(*) FROM users;") + user_count = cur.fetchone()[0] + print(f" ✓ Users: {user_count} registros") + + cur.execute("SELECT COUNT(*) FROM sessions;") + session_count = cur.fetchone()[0] + print(f" ✓ Sessions: {session_count} registros") + + cur.execute("SELECT mac_address, name FROM users LIMIT 3;") + rows = cur.fetchall() + print(f" ✓ Amostra Users:") + for mac, name in rows: + print(f" - {mac}: {name}") + + cur.execute("SELECT mac_address, access_point_name FROM sessions LIMIT 3;") + rows = cur.fetchall() + print(f" ✓ Amostra Sessions:") + for mac, ap in rows: + print(f" - {mac}: {ap}") + + conn.close() +except Exception as e: + print(f" ✗ Erro ao verificar: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + +print("\n" + "=" * 80) +print("✅ TEST PASSED: Transform + Load funcionando corretamente!") +print("=" * 80)