#!/usr/bin/env python3 # -*- coding: utf-8 -*- import subprocess import csv import os import json from datetime import datetime import psycopg2 def log(message): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}") def load_env(path=".env"): """Carrega variáveis de ambiente de um arquivo .env simples.""" if not os.path.exists(path): return try: with open(path, "r", encoding="utf-8") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value except Exception as e: log(f"Aviso: erro ao carregar .env: {e}") def get_db_connection(): """Cria conexão com o Postgres usando variáveis de ambiente.""" return psycopg2.connect( host=os.getenv("DB_HOST", "localhost"), port=int(os.getenv("DB_PORT", "5432")), user=os.getenv("DB_USER", "fusionpbx"), password=os.getenv("DB_PASSWORD", ""), database=os.getenv("DB_NAME", "fusionpbx"), ) def get_registrations(): """Executa o comando do FreeSWITCH para pegar registros SIP""" try: fs_cli = os.getenv("FS_CLI_PATH", "fs_cli") fs_profile = os.getenv("FS_PROFILE", "internal") fs_timeout = int(os.getenv("FS_CLI_TIMEOUT", "10")) result = subprocess.run( [fs_cli, '-x', f'sofia status profile {fs_profile} reg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=fs_timeout ) if result.returncode != 0: log(f"Erro ao executar o comando fs_cli: {result.stderr.strip()}") return "" return result.stdout except subprocess.TimeoutExpired: log("Timeout ao executar fs_cli") return "" except Exception as e: log(f"Erro ao executar subprocesso: {e}") return "" def parse_registrations(output): """Faz o parsing da saída do comando e organiza os dados em JSON""" registros = [] lines = output.splitlines() current_record = {} for line in lines: line = line.strip() if line.startswith("Call-ID:"): if current_record: # Salva o registro anterior registros.append(current_record) current_record = {} current_record["Call-ID"] = line.split(":", 1)[1].strip() elif line.startswith("User:"): user = line.split(":", 1)[1].strip() current_record["User"] = user.split("@")[0] current_record["Domain"] = user.split("@")[1] if "@" in user else "unknown" elif line.startswith("Auth-Realm:"): if "Domain" not in current_record or current_record["Domain"] == "unknown": current_record["Domain"] = line.split(":", 1)[1].strip() elif line.startswith("Agent:"): current_record["Agent"] = line.split(":", 1)[1].strip() elif line.startswith("Status:"): status = line.split(":", 1)[1].strip() if "Registered" in status: exp_start = status.find("EXP(") + 4 exp_end = status.find(")", exp_start) exp_time = status[exp_start:exp_end] if exp_start > 3 else "desconhecido" current_record["Status"] = f"Registrado desde {exp_time}" else: current_record["Status"] = "Sem registro" if current_record: registros.append(current_record) return registros def save_to_csv(registros): """Salva os registros em arquivos CSV com colunas formatadas""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = "csv_registrations" os.makedirs(output_dir, exist_ok=True) filename = f"{output_dir}/registrations_{timestamp}.csv" with open(filename, "w", newline="", encoding="utf-8") as csvfile: writer = csv.writer(csvfile) # Escreve o cabeçalho formatado writer.writerow(["Ramal", "Domínio", "Status"]) # Remove duplicatas e escreve os registros seen = set() for registro in registros: key = (registro["User"], registro["Domain"]) # Identifica duplicatas por Ramal e Domínio if key not in seen: seen.add(key) writer.writerow([ registro.get("User", ""), registro.get("Domain", ""), registro.get("Status", "") ]) log(f"CSV gerado: {filename}") def save_to_csv_by_auth_realm(registros): """Salva os registros em arquivos CSV separados por Auth-Realm com colunas específicas""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = "csv_registrations" os.makedirs(output_dir, exist_ok=True) # Group registros by Auth-Realm grouped_by_realm = {} for registro in registros: auth_realm = registro.get("Auth-Realm", "unknown") if auth_realm not in grouped_by_realm: grouped_by_realm[auth_realm] = [] grouped_by_realm[auth_realm].append(registro) # Create a CSV file for each Auth-Realm for auth_realm, realm_registros in grouped_by_realm.items(): sanitized_realm = auth_realm.replace("/", "_").replace("\\", "_") # Sanitize filename filename = f"{output_dir}/registrations_{sanitized_realm}_{timestamp}.csv" with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: # Use utf-8-sig encoding # Use csv.writer with explicit delimiter writer = csv.writer(csvfile, delimiter=";") # Use semicolon as delimiter # Write header with updated column names writer.writerow(["Usuário", "Registrado em", "Status", "Domínio"]) for registro in realm_registros: # Write only the selected columns with updated names writer.writerow([ registro.get("User", ""), registro.get("Agent", ""), registro.get("Status", ""), registro.get("Auth-Realm", "") ]) log(f"CSV gerado para Auth-Realm '{auth_realm}': {filename}") def save_to_csv_by_domain(registros, domain_mapping, trunks_by_domain, destinations_by_domain): """Salva os registros em arquivos CSV separados por domínio, incluindo troncos e DDR.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = "csv_registrations" os.makedirs(output_dir, exist_ok=True) # Agrupa os registros por domínio grouped_by_domain = {} for registro in registros: domain_uuid = registro.get("Domain", "unknown") if domain_uuid not in grouped_by_domain: grouped_by_domain[domain_uuid] = [] grouped_by_domain[domain_uuid].append(registro) # Cria um arquivo CSV para cada domínio for domain_uuid, domain_registros in grouped_by_domain.items(): domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"}) domain_description = domain_info["description"] sanitized_description = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_") # Sanitiza a descrição filename = f"{output_dir}/registrations_{sanitized_description}_{timestamp}.csv" # Obtém os troncos para o domínio trunks = trunks_by_domain.get(domain_uuid, []) trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado" # Obtém os destinos para o domínio destinations = destinations_by_domain.get(domain_uuid, []) destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado" with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: # Use utf-8-sig para compatibilidade com Excel writer = csv.writer(csvfile, delimiter=";") # Use ; como delimitador # Escreve o cabeçalho corrigido writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"]) # Escreve os registros seen = set() for registro in domain_registros: key = (registro["User"], domain_uuid) # Identifica duplicatas por Ramal e UUID do Domínio if key not in seen: seen.add(key) writer.writerow([ registro.get("User", ""), f"{domain_info['name']} ({domain_description})", # Nome e descrição no campo "Domínio" registro.get("Status", ""), registro.get("Agent", ""), # <-- Adiciona o dispositivo trunks_str, # Adiciona os troncos destinations_str # Adiciona os DDR ]) log(f"CSV gerado para domínio '{domain_description}': {filename}") def save_to_json(registros): """Salva os registros em um arquivo JSON""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = "json_registrations" os.makedirs(output_dir, exist_ok=True) filename = f"{output_dir}/registrations_{timestamp}.json" with open(filename, "w", encoding="utf-8") as jsonfile: json.dump(registros, jsonfile, indent=4, ensure_ascii=False) log(f"JSON gerado: {filename}") def get_all_users_from_db(): """Obtém todos os ramais e seus domínios do banco de dados.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = """ SELECT v_extensions.extension, v_extensions.domain_uuid, v_domains.domain_name FROM v_extensions JOIN v_domains ON v_extensions.domain_uuid = v_domains.domain_uuid """ cursor.execute(query) results = cursor.fetchall() all_users = [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results] return all_users except Exception as e: log(f"Erro ao consultar o banco de dados: {e}") return [] finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def get_domain_mapping(): """Obtém o mapeamento de domain_uuid para nomes e descrições de domínio.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = """ SELECT domain_uuid, domain_name, domain_description FROM v_domains """ cursor.execute(query) results = cursor.fetchall() domain_mapping = { row[0]: {"name": row[1], "description": row[2] or "unknown"} for row in results } return domain_mapping except Exception as e: log(f"Erro ao consultar o banco de dados para domínios: {e}") return {} finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def merge_registered_and_unregistered(all_users, registered_users): """Combina ramais registrados e não registrados, filtrando por domínio (nome).""" # O registered_map usa (User, Domain) onde Domain é o NOME do domínio registered_map = { (user["User"], user["Domain"]): user for user in registered_users } merged_users = [] for user in all_users: # Compare pelo nome do domínio reg = registered_map.get((user["User"], user["Domain"])) merged_users.append({ "User": user["User"], "Domain": user["DomainUUID"], # Para agrupamento e uso do UUID "Status": reg["Status"] if reg else "Sem registro", "Agent": reg["Agent"] if reg and "Agent" in reg else "" }) return merged_users def get_trunks_by_domain(): """Obtém os troncos registrados por domínio.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = """ SELECT domain_uuid, gateway, enabled FROM v_gateways WHERE enabled = 'true' """ cursor.execute(query) results = cursor.fetchall() trunks_by_domain = {} for row in results: domain_uuid = row[0] gateway = row[1] if domain_uuid not in trunks_by_domain: trunks_by_domain[domain_uuid] = [] trunks_by_domain[domain_uuid].append(gateway) return trunks_by_domain except Exception as e: log(f"Erro ao consultar os troncos no banco de dados: {e}") return {} finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def get_destinations_by_domain(): """Obtém os destinos diretos de ramal (DDR) por domínio, sem duplicidades.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = """ SELECT domain_uuid, destination_number, destination_enabled FROM v_destinations """ cursor.execute(query) results = cursor.fetchall() destinations_by_domain = {} for row in results: domain_uuid = row[0] destination_number = row[1] enabled = row[2] destination_entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})" if domain_uuid not in destinations_by_domain: destinations_by_domain[domain_uuid] = set() # Usar um conjunto para evitar duplicidades destinations_by_domain[domain_uuid].add(destination_entry) # Adiciona ao conjunto # Converte os conjuntos para listas ordenadas for domain_uuid in destinations_by_domain: destinations_by_domain[domain_uuid] = sorted(destinations_by_domain[domain_uuid]) return destinations_by_domain except Exception as e: log(f"Erro ao consultar os destinos no banco de dados: {e}") return {} finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def main(): load_env() # Obter registros do FreeSWITCH output = get_registrations() registered_users = parse_registrations(output) if output else [] # Obter todos os ramais do banco de dados all_users = get_all_users_from_db() # Obter o mapeamento de domain_uuid para nomes e descrições de domínio domain_mapping = get_domain_mapping() # Obter os troncos registrados por domínio trunks_by_domain = get_trunks_by_domain() # Obter os destinos diretos de ramal (DDR) por domínio destinations_by_domain = get_destinations_by_domain() # Combinar ramais registrados e não registrados all_users_with_status = merge_registered_and_unregistered(all_users, registered_users) # Salvar os resultados em arquivos CSV separados por domínio save_to_csv_by_domain(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain) if __name__ == "__main__": main()