fusion-registration-hunter/coleta_registro.py
Rafael Lopes 01ffd019f8
All checks were successful
Deploy corp02 / deploy (push) Successful in 1s
Deploy corp03 / deploy (push) Successful in 2s
Deploy corp04 / deploy (push) Successful in 21s
FEAT: Adiciona funções para identificar dispositivos físicos, atualiza geração de relatório semanal com consolidado
2026-04-15 11:38:30 -03:00

438 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import csv
import os
import json
from datetime import datetime, timedelta
import psycopg2
def log(message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def load_env(path=None):
"""Carrega variáveis de ambiente de um arquivo .env simples."""
if path is None:
script_dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(script_dir, ".env")
if not os.path.exists(path):
log(f"Aviso: arquivo .env não encontrado em {path}")
return
try:
with open(path, "r", encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
except Exception as e:
log(f"Aviso: erro ao carregar .env: {e}")
def get_db_connection():
"""Cria conexão com o Postgres usando variáveis de ambiente."""
return psycopg2.connect(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
user=os.getenv("DB_USER", "fusionpbx"),
password=os.getenv("DB_PASSWORD", ""),
database=os.getenv("DB_NAME", "fusionpbx"),
)
def get_registrations():
"""Executa o comando do FreeSWITCH para pegar registros SIP."""
try:
fs_cli = os.getenv("FS_CLI_PATH", "fs_cli")
fs_profile = os.getenv("FS_PROFILE", "internal")
fs_timeout = int(os.getenv("FS_CLI_TIMEOUT", "10"))
result = subprocess.run(
[fs_cli, '-x', f'sofia status profile {fs_profile} reg'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=fs_timeout
)
if result.returncode != 0:
log(f"Erro ao executar o comando fs_cli: {result.stderr.strip()}")
return ""
return result.stdout
except subprocess.TimeoutExpired:
log("Timeout ao executar fs_cli")
return ""
except Exception as e:
log(f"Erro ao executar subprocesso: {e}")
return ""
def parse_registrations(output):
"""Faz o parsing da saída do comando e organiza os dados em JSON."""
registros = []
lines = output.splitlines()
current_record = {}
for line in lines:
line = line.strip()
if line.startswith("Call-ID:"):
if current_record:
registros.append(current_record)
current_record = {}
current_record["Call-ID"] = line.split(":", 1)[1].strip()
elif line.startswith("User:"):
user = line.split(":", 1)[1].strip()
current_record["User"] = user.split("@")[0]
current_record["Domain"] = user.split("@")[1] if "@" in user else "unknown"
elif line.startswith("Auth-Realm:"):
if "Domain" not in current_record or current_record["Domain"] == "unknown":
current_record["Domain"] = line.split(":", 1)[1].strip()
elif line.startswith("Agent:"):
current_record["Agent"] = line.split(":", 1)[1].strip()
elif line.startswith("Status:"):
status = line.split(":", 1)[1].strip()
if "Registered" in status:
exp_start = status.find("EXP(") + 4
exp_end = status.find(")", exp_start)
exp_time = status[exp_start:exp_end] if exp_start > 3 else "desconhecido"
current_record["Status"] = f"Registrado desde {exp_time}"
else:
current_record["Status"] = "Sem registro"
if current_record:
registros.append(current_record)
return registros
def get_all_users_from_db():
"""Obtém todos os ramais e seus domínios do banco de dados."""
connection = None
cursor = None
try:
connection = get_db_connection()
cursor = connection.cursor()
query = """
SELECT v_extensions.extension, v_extensions.domain_uuid, v_domains.domain_name
FROM v_extensions
JOIN v_domains ON v_extensions.domain_uuid = v_domains.domain_uuid
"""
cursor.execute(query)
results = cursor.fetchall()
return [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results]
except Exception as e:
log(f"Erro ao consultar o banco de dados: {e}")
return []
finally:
try:
if cursor: cursor.close()
if connection: connection.close()
except:
pass
def get_domain_mapping():
"""Obtém o mapeamento de domain_uuid para nomes e descrições de domínio."""
connection = None
cursor = None
try:
connection = get_db_connection()
cursor = connection.cursor()
query = "SELECT domain_uuid, domain_name, domain_description FROM v_domains"
cursor.execute(query)
results = cursor.fetchall()
return {
row[0]: {"name": row[1], "description": row[2] or "unknown"}
for row in results
}
except Exception as e:
log(f"Erro ao consultar o banco de dados para domínios: {e}")
return {}
finally:
try:
if cursor: cursor.close()
if connection: connection.close()
except:
pass
def get_trunks_by_domain():
"""Obtém os troncos registrados por domínio."""
connection = None
cursor = None
try:
connection = get_db_connection()
cursor = connection.cursor()
query = "SELECT domain_uuid, gateway FROM v_gateways WHERE enabled = 'true'"
cursor.execute(query)
results = cursor.fetchall()
trunks_by_domain = {}
for row in results:
domain_uuid, gateway = row[0], row[1]
trunks_by_domain.setdefault(domain_uuid, []).append(gateway)
return trunks_by_domain
except Exception as e:
log(f"Erro ao consultar os troncos no banco de dados: {e}")
return {}
finally:
try:
if cursor: cursor.close()
if connection: connection.close()
except:
pass
def get_destinations_by_domain():
"""Obtém os destinos diretos de ramal (DDR) por domínio, sem duplicidades."""
connection = None
cursor = None
try:
connection = get_db_connection()
cursor = connection.cursor()
query = "SELECT domain_uuid, destination_number, destination_enabled FROM v_destinations"
cursor.execute(query)
results = cursor.fetchall()
destinations_by_domain = {}
for row in results:
domain_uuid, destination_number, enabled = row
entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})"
destinations_by_domain.setdefault(domain_uuid, set()).add(entry)
for domain_uuid in destinations_by_domain:
destinations_by_domain[domain_uuid] = sorted(destinations_by_domain[domain_uuid])
return destinations_by_domain
except Exception as e:
log(f"Erro ao consultar os destinos no banco de dados: {e}")
return {}
finally:
try:
if cursor: cursor.close()
if connection: connection.close()
except:
pass
def merge_registered_and_unregistered(all_users, registered_users):
"""Combina ramais registrados e não registrados."""
registered_map = {
(user["User"], user["Domain"]): user
for user in registered_users
}
merged_users = []
for user in all_users:
reg = registered_map.get((user["User"], user["Domain"]))
merged_users.append({
"User": user["User"],
"Domain": user["DomainUUID"],
"Status": reg["Status"] if reg else "Sem registro",
"Agent": reg.get("Agent", "") if reg else ""
})
return merged_users
def save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain):
"""Salva os registros do dia em arquivos CSV separados por domínio."""
today = datetime.now().strftime("%Y%m%d")
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "csv_registrations", today)
os.makedirs(output_dir, exist_ok=True)
grouped_by_domain = {}
for registro in all_users_with_status:
domain_uuid = registro.get("Domain", "unknown")
grouped_by_domain.setdefault(domain_uuid, []).append(registro)
for domain_uuid, domain_registros in grouped_by_domain.items():
domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"})
domain_description = domain_info["description"]
sanitized = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_")
filename = os.path.join(output_dir, f"{sanitized}.csv")
trunks = trunks_by_domain.get(domain_uuid, [])
trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado"
destinations = destinations_by_domain.get(domain_uuid, [])
destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado"
with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.writer(csvfile, delimiter=";")
writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"])
seen = set()
for registro in domain_registros:
key = (registro["User"], domain_uuid)
if key not in seen:
seen.add(key)
writer.writerow([
registro.get("User", ""),
f"{domain_info['name']} ({domain_description})",
registro.get("Status", ""),
registro.get("Agent", ""),
trunks_str,
destinations_str
])
log(f"CSV diário gerado para domínio '{domain_description}': {filename}")
return output_dir
DEVICE_WHITELIST = ["yealink", "fanvil"]
def is_physical_phone(agent):
"""Retorna True se o dispositivo for Yealink ou Fanvil (telefone físico)."""
agent_lower = agent.lower()
return any(w in agent_lower for w in DEVICE_WHITELIST)
def generate_weekly_report():
"""
Gera relatório semanal consolidado todo dia, baseado nos últimos 7 dias.
Substitui os arquivos anteriores.
- semanal_<dominio>.csv — detalhe por ramal
- consolidado_semanal.csv — resumo com contagem de telefones físicos por domínio
"""
today = datetime.now()
script_dir = os.path.dirname(os.path.abspath(__file__))
base_dir = os.path.join(script_dir, "csv_registrations")
weekly_dir = os.path.join(script_dir, "csv_semanal")
os.makedirs(weekly_dir, exist_ok=True)
# Últimos 7 dias (do mais antigo ao mais recente)
days = [(today - timedelta(days=i)).strftime("%Y%m%d") for i in range(6, -1, -1)]
log(f"Gerando relatório semanal — período {days[0]} a {days[-1]}")
# Estrutura: { domain_key: { ramal: { dias_presente, agent_mais_recente, domain_label } } }
weekly_data = {}
for day in days:
day_dir = os.path.join(base_dir, day)
if not os.path.exists(day_dir):
log(f" Sem dados para o dia {day}, pulando.")
continue
for csv_file in os.listdir(day_dir):
if not csv_file.endswith(".csv"):
continue
domain_key = csv_file.replace(".csv", "")
filepath = os.path.join(day_dir, csv_file)
try:
with open(filepath, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f, delimiter=";")
for row in reader:
ramal = row.get("Ramal", "").strip()
status = row.get("Status", "").strip()
agent = row.get("Dispositivo", "").strip()
domain_label = row.get("Domínio", "").strip()
if not ramal:
continue
weekly_data.setdefault(domain_key, {})
weekly_data[domain_key].setdefault(ramal, {
"dias_presente": 0,
"agent_mais_recente": "",
"domain_label": domain_label
})
if status and status != "Sem registro":
weekly_data[domain_key][ramal]["dias_presente"] += 1
if agent:
# Sempre sobrescreve — último dia processado = mais recente
weekly_data[domain_key][ramal]["agent_mais_recente"] = agent
if domain_label:
weekly_data[domain_key][ramal]["domain_label"] = domain_label
except Exception as e:
log(f" Erro ao ler {filepath}: {e}")
if not weekly_data:
log("Nenhum dado encontrado para gerar relatório semanal.")
return
# Consolidado: { domain_label: contagem de telefones físicos presentes }
consolidado = {}
# Gera um CSV por domínio — substitui o anterior sem timestamp no nome
for domain_key, ramais in weekly_data.items():
filename = os.path.join(weekly_dir, f"semanal_{domain_key}.csv")
domain_label_geral = ""
telefones_fisicos = 0
with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.writer(csvfile, delimiter=";")
writer.writerow([
"Ramal",
"Domínio",
"Status Semanal",
"Dias Presente (de 7)",
"Dispositivo (mais recente)"
])
for ramal, info in sorted(ramais.items()):
presente = info["dias_presente"] >= 1
status_semanal = "Presente" if presente else "Ausente"
agent = info["agent_mais_recente"]
domain_label_geral = info["domain_label"] or domain_key
writer.writerow([
ramal,
domain_label_geral,
status_semanal,
info["dias_presente"],
agent
])
# Conta para o consolidado: presente + dispositivo físico
if presente and is_physical_phone(agent):
telefones_fisicos += 1
consolidado[domain_label_geral or domain_key] = telefones_fisicos
log(f"Relatório semanal gerado: {filename}")
# Gera o consolidado geral — substitui o anterior
consolidado_path = os.path.join(weekly_dir, "consolidado_semanal.csv")
with open(consolidado_path, "w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.writer(csvfile, delimiter=";")
writer.writerow(["Domínio", "Telefones Físicos Presentes (Yealink/Fanvil)"])
for domain_label, qtd in sorted(consolidado.items()):
writer.writerow([domain_label, qtd])
log(f"Consolidado semanal gerado: {consolidado_path}")
log(f"Relatório semanal concluído. {len(weekly_data)} domínios processados.")
def main():
load_env()
log("=== Iniciando coleta diária ===")
# Obtém registros do FreeSWITCH
output = get_registrations()
registered_users = parse_registrations(output) if output else []
# Obtém dados do banco
all_users = get_all_users_from_db()
domain_mapping = get_domain_mapping()
trunks_by_domain = get_trunks_by_domain()
destinations_by_domain = get_destinations_by_domain()
# Combina registrados e não registrados
all_users_with_status = merge_registered_and_unregistered(all_users, registered_users)
# Salva CSV do dia
save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain)
log("=== Coleta diária concluída ===")
log("=== Gerando relatório semanal ===")
generate_weekly_report()
if __name__ == "__main__":
main()