#!/usr/bin/env python3 # -*- coding: utf-8 -*- import subprocess import csv import os import json from datetime import datetime, timedelta import psycopg2 def log(message): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}") def load_env(path=None): """Carrega variáveis de ambiente de um arquivo .env simples.""" if path is None: script_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(script_dir, ".env") if not os.path.exists(path): log(f"Aviso: arquivo .env não encontrado em {path}") return try: with open(path, "r", encoding="utf-8") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value except Exception as e: log(f"Aviso: erro ao carregar .env: {e}") def get_db_connection(): """Cria conexão com o Postgres usando variáveis de ambiente.""" return psycopg2.connect( host=os.getenv("DB_HOST", "localhost"), port=int(os.getenv("DB_PORT", "5432")), user=os.getenv("DB_USER", "fusionpbx"), password=os.getenv("DB_PASSWORD", ""), database=os.getenv("DB_NAME", "fusionpbx"), ) def get_registrations(): """Executa o comando do FreeSWITCH para pegar registros SIP.""" try: fs_cli = os.getenv("FS_CLI_PATH", "fs_cli") fs_profile = os.getenv("FS_PROFILE", "internal") fs_timeout = int(os.getenv("FS_CLI_TIMEOUT", "10")) result = subprocess.run( [fs_cli, '-x', f'sofia status profile {fs_profile} reg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=fs_timeout ) if result.returncode != 0: log(f"Erro ao executar o comando fs_cli: {result.stderr.strip()}") return "" return result.stdout except subprocess.TimeoutExpired: log("Timeout ao executar fs_cli") return "" except Exception as e: log(f"Erro ao executar subprocesso: {e}") return "" def parse_registrations(output): """Faz o parsing da saída do comando e organiza os dados em JSON.""" registros = [] lines = output.splitlines() current_record = {} for line in lines: line = line.strip() if line.startswith("Call-ID:"): if current_record: registros.append(current_record) current_record = {} current_record["Call-ID"] = line.split(":", 1)[1].strip() elif line.startswith("User:"): user = line.split(":", 1)[1].strip() current_record["User"] = user.split("@")[0] current_record["Domain"] = user.split("@")[1] if "@" in user else "unknown" elif line.startswith("Auth-Realm:"): if "Domain" not in current_record or current_record["Domain"] == "unknown": current_record["Domain"] = line.split(":", 1)[1].strip() elif line.startswith("Agent:"): current_record["Agent"] = line.split(":", 1)[1].strip() elif line.startswith("Status:"): status = line.split(":", 1)[1].strip() if "Registered" in status: exp_start = status.find("EXP(") + 4 exp_end = status.find(")", exp_start) exp_time = status[exp_start:exp_end] if exp_start > 3 else "desconhecido" current_record["Status"] = f"Registrado desde {exp_time}" else: current_record["Status"] = "Sem registro" if current_record: registros.append(current_record) return registros def get_all_users_from_db(): """Obtém todos os ramais e seus domínios do banco de dados.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = """ SELECT v_extensions.extension, v_extensions.domain_uuid, v_domains.domain_name FROM v_extensions JOIN v_domains ON v_extensions.domain_uuid = v_domains.domain_uuid """ cursor.execute(query) results = cursor.fetchall() return [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results] except Exception as e: log(f"Erro ao consultar o banco de dados: {e}") return [] finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def get_domain_mapping(): """Obtém o mapeamento de domain_uuid para nomes e descrições de domínio.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = "SELECT domain_uuid, domain_name, domain_description FROM v_domains" cursor.execute(query) results = cursor.fetchall() return { row[0]: {"name": row[1], "description": row[2] or "unknown"} for row in results } except Exception as e: log(f"Erro ao consultar o banco de dados para domínios: {e}") return {} finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def get_trunks_by_domain(): """Obtém os troncos registrados por domínio.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = "SELECT domain_uuid, gateway FROM v_gateways WHERE enabled = 'true'" cursor.execute(query) results = cursor.fetchall() trunks_by_domain = {} for row in results: domain_uuid, gateway = row[0], row[1] trunks_by_domain.setdefault(domain_uuid, []).append(gateway) return trunks_by_domain except Exception as e: log(f"Erro ao consultar os troncos no banco de dados: {e}") return {} finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def get_destinations_by_domain(): """Obtém os destinos diretos de ramal (DDR) por domínio, sem duplicidades.""" connection = None cursor = None try: connection = get_db_connection() cursor = connection.cursor() query = "SELECT domain_uuid, destination_number, destination_enabled FROM v_destinations" cursor.execute(query) results = cursor.fetchall() destinations_by_domain = {} for row in results: domain_uuid, destination_number, enabled = row entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})" destinations_by_domain.setdefault(domain_uuid, set()).add(entry) for domain_uuid in destinations_by_domain: destinations_by_domain[domain_uuid] = sorted(destinations_by_domain[domain_uuid]) return destinations_by_domain except Exception as e: log(f"Erro ao consultar os destinos no banco de dados: {e}") return {} finally: try: if cursor: cursor.close() if connection: connection.close() except: pass def merge_registered_and_unregistered(all_users, registered_users): """Combina ramais registrados e não registrados.""" registered_map = { (user["User"], user["Domain"]): user for user in registered_users } merged_users = [] for user in all_users: reg = registered_map.get((user["User"], user["Domain"])) merged_users.append({ "User": user["User"], "Domain": user["DomainUUID"], "Status": reg["Status"] if reg else "Sem registro", "Agent": reg.get("Agent", "") if reg else "" }) return merged_users def save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain): """Salva os registros do dia em arquivos CSV separados por domínio.""" today = datetime.now().strftime("%Y%m%d") output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "csv_registrations", today) os.makedirs(output_dir, exist_ok=True) grouped_by_domain = {} for registro in all_users_with_status: domain_uuid = registro.get("Domain", "unknown") grouped_by_domain.setdefault(domain_uuid, []).append(registro) for domain_uuid, domain_registros in grouped_by_domain.items(): domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"}) domain_description = domain_info["description"] sanitized = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_") filename = os.path.join(output_dir, f"{sanitized}.csv") trunks = trunks_by_domain.get(domain_uuid, []) trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado" destinations = destinations_by_domain.get(domain_uuid, []) destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado" with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: writer = csv.writer(csvfile, delimiter=";") writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"]) seen = set() for registro in domain_registros: key = (registro["User"], domain_uuid) if key not in seen: seen.add(key) writer.writerow([ registro.get("User", ""), f"{domain_info['name']} ({domain_description})", registro.get("Status", ""), registro.get("Agent", ""), trunks_str, destinations_str ]) log(f"CSV diário gerado para domínio '{domain_description}': {filename}") return output_dir DEVICE_WHITELIST = ["yealink", "fanvil"] def is_physical_phone(agent): """Retorna True se o dispositivo for Yealink ou Fanvil (telefone físico).""" agent_lower = agent.lower() return any(w in agent_lower for w in DEVICE_WHITELIST) def generate_weekly_report(): """ Gera relatório semanal consolidado todo dia, baseado nos últimos 7 dias. Substitui os arquivos anteriores. - semanal_.csv — detalhe por ramal - consolidado_semanal.csv — resumo com contagem de telefones físicos por domínio """ today = datetime.now() script_dir = os.path.dirname(os.path.abspath(__file__)) base_dir = os.path.join(script_dir, "csv_registrations") weekly_dir = os.path.join(script_dir, "csv_semanal") os.makedirs(weekly_dir, exist_ok=True) # Últimos 7 dias (do mais antigo ao mais recente) days = [(today - timedelta(days=i)).strftime("%Y%m%d") for i in range(6, -1, -1)] log(f"Gerando relatório semanal — período {days[0]} a {days[-1]}") # Estrutura: { domain_key: { ramal: { dias_presente, agent_mais_recente, domain_label } } } weekly_data = {} for day in days: day_dir = os.path.join(base_dir, day) if not os.path.exists(day_dir): log(f" Sem dados para o dia {day}, pulando.") continue for csv_file in os.listdir(day_dir): if not csv_file.endswith(".csv"): continue domain_key = csv_file.replace(".csv", "") filepath = os.path.join(day_dir, csv_file) try: with open(filepath, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f, delimiter=";") for row in reader: ramal = row.get("Ramal", "").strip() status = row.get("Status", "").strip() agent = row.get("Dispositivo", "").strip() domain_label = row.get("Domínio", "").strip() if not ramal: continue weekly_data.setdefault(domain_key, {}) weekly_data[domain_key].setdefault(ramal, { "dias_presente": 0, "agent_mais_recente": "", "domain_label": domain_label }) if status and status != "Sem registro": weekly_data[domain_key][ramal]["dias_presente"] += 1 if agent: # Sempre sobrescreve — último dia processado = mais recente weekly_data[domain_key][ramal]["agent_mais_recente"] = agent if domain_label: weekly_data[domain_key][ramal]["domain_label"] = domain_label except Exception as e: log(f" Erro ao ler {filepath}: {e}") if not weekly_data: log("Nenhum dado encontrado para gerar relatório semanal.") return # Consolidado: { domain_label: contagem de telefones físicos presentes } consolidado = {} # Gera um CSV por domínio — substitui o anterior sem timestamp no nome for domain_key, ramais in weekly_data.items(): filename = os.path.join(weekly_dir, f"semanal_{domain_key}.csv") domain_label_geral = "" telefones_fisicos = 0 with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: writer = csv.writer(csvfile, delimiter=";") writer.writerow([ "Ramal", "Domínio", "Status Semanal", "Dias Presente (de 7)", "Dispositivo (mais recente)" ]) for ramal, info in sorted(ramais.items()): presente = info["dias_presente"] >= 1 status_semanal = "Presente" if presente else "Ausente" agent = info["agent_mais_recente"] domain_label_geral = info["domain_label"] or domain_key writer.writerow([ ramal, domain_label_geral, status_semanal, info["dias_presente"], agent ]) # Conta para o consolidado: presente + dispositivo físico if presente and is_physical_phone(agent): telefones_fisicos += 1 consolidado[domain_label_geral or domain_key] = telefones_fisicos log(f"Relatório semanal gerado: {filename}") # Gera o consolidado geral — substitui o anterior consolidado_path = os.path.join(weekly_dir, "consolidado_semanal.csv") with open(consolidado_path, "w", newline="", encoding="utf-8-sig") as csvfile: writer = csv.writer(csvfile, delimiter=";") writer.writerow(["Domínio", "Telefones Físicos Presentes (Yealink/Fanvil)"]) for domain_label, qtd in sorted(consolidado.items()): writer.writerow([domain_label, qtd]) log(f"Consolidado semanal gerado: {consolidado_path}") log(f"Relatório semanal concluído. {len(weekly_data)} domínios processados.") def main(): load_env() log("=== Iniciando coleta diária ===") # Obtém registros do FreeSWITCH output = get_registrations() registered_users = parse_registrations(output) if output else [] # Obtém dados do banco all_users = get_all_users_from_db() domain_mapping = get_domain_mapping() trunks_by_domain = get_trunks_by_domain() destinations_by_domain = get_destinations_by_domain() # Combina registrados e não registrados all_users_with_status = merge_registered_and_unregistered(all_users, registered_users) # Salva CSV do dia save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain) log("=== Coleta diária concluída ===") log("=== Gerando relatório semanal ===") generate_weekly_report() if __name__ == "__main__": main()