diff --git a/coleta_registro.py b/coleta_registro.py index d6b2315..cd16545 100644 --- a/coleta_registro.py +++ b/coleta_registro.py @@ -5,7 +5,7 @@ import subprocess import csv import os import json -from datetime import datetime +from datetime import datetime, timedelta import psycopg2 @@ -14,9 +14,13 @@ def log(message): print(f"[{timestamp}] {message}") -def load_env(path=".env"): +def load_env(path=None): """Carrega variáveis de ambiente de um arquivo .env simples.""" + if path is None: + script_dir = os.path.dirname(os.path.abspath(__file__)) + path = os.path.join(script_dir, ".env") if not os.path.exists(path): + log(f"Aviso: arquivo .env não encontrado em {path}") return try: with open(path, "r", encoding="utf-8") as f: @@ -43,8 +47,9 @@ def get_db_connection(): database=os.getenv("DB_NAME", "fusionpbx"), ) + def get_registrations(): - """Executa o comando do FreeSWITCH para pegar registros SIP""" + """Executa o comando do FreeSWITCH para pegar registros SIP.""" try: fs_cli = os.getenv("FS_CLI_PATH", "fs_cli") fs_profile = os.getenv("FS_PROFILE", "internal") @@ -67,8 +72,9 @@ def get_registrations(): log(f"Erro ao executar subprocesso: {e}") return "" + def parse_registrations(output): - """Faz o parsing da saída do comando e organiza os dados em JSON""" + """Faz o parsing da saída do comando e organiza os dados em JSON.""" registros = [] lines = output.splitlines() current_record = {} @@ -76,7 +82,7 @@ def parse_registrations(output): for line in lines: line = line.strip() if line.startswith("Call-ID:"): - if current_record: # Salva o registro anterior + if current_record: registros.append(current_record) current_record = {} current_record["Call-ID"] = line.split(":", 1)[1].strip() @@ -104,127 +110,6 @@ def parse_registrations(output): return registros -def save_to_csv(registros): - """Salva os registros em arquivos CSV com colunas formatadas""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - output_dir = "csv_registrations" - os.makedirs(output_dir, exist_ok=True) - - filename = f"{output_dir}/registrations_{timestamp}.csv" - with open(filename, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.writer(csvfile) - # Escreve o cabeçalho formatado - writer.writerow(["Ramal", "Domínio", "Status"]) - - # Remove duplicatas e escreve os registros - seen = set() - for registro in registros: - key = (registro["User"], registro["Domain"]) # Identifica duplicatas por Ramal e Domínio - if key not in seen: - seen.add(key) - writer.writerow([ - registro.get("User", ""), - registro.get("Domain", ""), - registro.get("Status", "") - ]) - - log(f"CSV gerado: {filename}") - -def save_to_csv_by_auth_realm(registros): - """Salva os registros em arquivos CSV separados por Auth-Realm com colunas específicas""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - output_dir = "csv_registrations" - os.makedirs(output_dir, exist_ok=True) - - # Group registros by Auth-Realm - grouped_by_realm = {} - for registro in registros: - auth_realm = registro.get("Auth-Realm", "unknown") - if auth_realm not in grouped_by_realm: - grouped_by_realm[auth_realm] = [] - grouped_by_realm[auth_realm].append(registro) - - # Create a CSV file for each Auth-Realm - for auth_realm, realm_registros in grouped_by_realm.items(): - sanitized_realm = auth_realm.replace("/", "_").replace("\\", "_") # Sanitize filename - filename = f"{output_dir}/registrations_{sanitized_realm}_{timestamp}.csv" - with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: # Use utf-8-sig encoding - # Use csv.writer with explicit delimiter - writer = csv.writer(csvfile, delimiter=";") # Use semicolon as delimiter - # Write header with updated column names - writer.writerow(["Usuário", "Registrado em", "Status", "Domínio"]) - for registro in realm_registros: - # Write only the selected columns with updated names - writer.writerow([ - registro.get("User", ""), - registro.get("Agent", ""), - registro.get("Status", ""), - registro.get("Auth-Realm", "") - ]) - - log(f"CSV gerado para Auth-Realm '{auth_realm}': {filename}") - -def save_to_csv_by_domain(registros, domain_mapping, trunks_by_domain, destinations_by_domain): - """Salva os registros em arquivos CSV separados por domínio, incluindo troncos e DDR.""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - output_dir = "csv_registrations" - os.makedirs(output_dir, exist_ok=True) - - # Agrupa os registros por domínio - grouped_by_domain = {} - for registro in registros: - domain_uuid = registro.get("Domain", "unknown") - if domain_uuid not in grouped_by_domain: - grouped_by_domain[domain_uuid] = [] - grouped_by_domain[domain_uuid].append(registro) - - # Cria um arquivo CSV para cada domínio - for domain_uuid, domain_registros in grouped_by_domain.items(): - domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"}) - domain_description = domain_info["description"] - sanitized_description = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_") # Sanitiza a descrição - filename = f"{output_dir}/registrations_{sanitized_description}_{timestamp}.csv" - - # Obtém os troncos para o domínio - trunks = trunks_by_domain.get(domain_uuid, []) - trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado" - - # Obtém os destinos para o domínio - destinations = destinations_by_domain.get(domain_uuid, []) - destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado" - - with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: # Use utf-8-sig para compatibilidade com Excel - writer = csv.writer(csvfile, delimiter=";") # Use ; como delimitador - # Escreve o cabeçalho corrigido - writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"]) - # Escreve os registros - seen = set() - for registro in domain_registros: - key = (registro["User"], domain_uuid) # Identifica duplicatas por Ramal e UUID do Domínio - if key not in seen: - seen.add(key) - writer.writerow([ - registro.get("User", ""), - f"{domain_info['name']} ({domain_description})", # Nome e descrição no campo "Domínio" - registro.get("Status", ""), - registro.get("Agent", ""), # <-- Adiciona o dispositivo - trunks_str, # Adiciona os troncos - destinations_str # Adiciona os DDR - ]) - - log(f"CSV gerado para domínio '{domain_description}': {filename}") - -def save_to_json(registros): - """Salva os registros em um arquivo JSON""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - output_dir = "json_registrations" - os.makedirs(output_dir, exist_ok=True) - - filename = f"{output_dir}/registrations_{timestamp}.json" - with open(filename, "w", encoding="utf-8") as jsonfile: - json.dump(registros, jsonfile, indent=4, ensure_ascii=False) - - log(f"JSON gerado: {filename}") def get_all_users_from_db(): """Obtém todos os ramais e seus domínios do banco de dados.""" @@ -240,20 +125,18 @@ def get_all_users_from_db(): """ cursor.execute(query) results = cursor.fetchall() - all_users = [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results] - return all_users + return [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results] except Exception as e: log(f"Erro ao consultar o banco de dados: {e}") return [] finally: try: - if cursor: - cursor.close() - if connection: - connection.close() + if cursor: cursor.close() + if connection: connection.close() except: pass + def get_domain_mapping(): """Obtém o mapeamento de domain_uuid para nomes e descrições de domínio.""" connection = None @@ -261,48 +144,23 @@ def get_domain_mapping(): try: connection = get_db_connection() cursor = connection.cursor() - query = """ - SELECT domain_uuid, domain_name, domain_description - FROM v_domains - """ + query = "SELECT domain_uuid, domain_name, domain_description FROM v_domains" cursor.execute(query) results = cursor.fetchall() - domain_mapping = { - row[0]: {"name": row[1], "description": row[2] or "unknown"} for row in results + return { + row[0]: {"name": row[1], "description": row[2] or "unknown"} + for row in results } - return domain_mapping except Exception as e: log(f"Erro ao consultar o banco de dados para domínios: {e}") return {} finally: try: - if cursor: - cursor.close() - if connection: - connection.close() + if cursor: cursor.close() + if connection: connection.close() except: pass -def merge_registered_and_unregistered(all_users, registered_users): - """Combina ramais registrados e não registrados, filtrando por domínio (nome).""" - # O registered_map usa (User, Domain) onde Domain é o NOME do domínio - registered_map = { - (user["User"], user["Domain"]): user - for user in registered_users - } - - merged_users = [] - for user in all_users: - # Compare pelo nome do domínio - reg = registered_map.get((user["User"], user["Domain"])) - merged_users.append({ - "User": user["User"], - "Domain": user["DomainUUID"], # Para agrupamento e uso do UUID - "Status": reg["Status"] if reg else "Sem registro", - "Agent": reg["Agent"] if reg and "Agent" in reg else "" - }) - - return merged_users def get_trunks_by_domain(): """Obtém os troncos registrados por domínio.""" @@ -311,33 +169,25 @@ def get_trunks_by_domain(): try: connection = get_db_connection() cursor = connection.cursor() - query = """ - SELECT domain_uuid, gateway, enabled - FROM v_gateways - WHERE enabled = 'true' - """ + query = "SELECT domain_uuid, gateway FROM v_gateways WHERE enabled = 'true'" cursor.execute(query) results = cursor.fetchall() trunks_by_domain = {} for row in results: - domain_uuid = row[0] - gateway = row[1] - if domain_uuid not in trunks_by_domain: - trunks_by_domain[domain_uuid] = [] - trunks_by_domain[domain_uuid].append(gateway) + domain_uuid, gateway = row[0], row[1] + trunks_by_domain.setdefault(domain_uuid, []).append(gateway) return trunks_by_domain except Exception as e: log(f"Erro ao consultar os troncos no banco de dados: {e}") return {} finally: try: - if cursor: - cursor.close() - if connection: - connection.close() + if cursor: cursor.close() + if connection: connection.close() except: pass + def get_destinations_by_domain(): """Obtém os destinos diretos de ramal (DDR) por domínio, sem duplicidades.""" connection = None @@ -345,63 +195,223 @@ def get_destinations_by_domain(): try: connection = get_db_connection() cursor = connection.cursor() - query = """ - SELECT domain_uuid, destination_number, destination_enabled - FROM v_destinations - """ + query = "SELECT domain_uuid, destination_number, destination_enabled FROM v_destinations" cursor.execute(query) results = cursor.fetchall() destinations_by_domain = {} for row in results: - domain_uuid = row[0] - destination_number = row[1] - enabled = row[2] - destination_entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})" - if domain_uuid not in destinations_by_domain: - destinations_by_domain[domain_uuid] = set() # Usar um conjunto para evitar duplicidades - destinations_by_domain[domain_uuid].add(destination_entry) # Adiciona ao conjunto - - # Converte os conjuntos para listas ordenadas + domain_uuid, destination_number, enabled = row + entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})" + destinations_by_domain.setdefault(domain_uuid, set()).add(entry) for domain_uuid in destinations_by_domain: destinations_by_domain[domain_uuid] = sorted(destinations_by_domain[domain_uuid]) - return destinations_by_domain except Exception as e: log(f"Erro ao consultar os destinos no banco de dados: {e}") return {} finally: try: - if cursor: - cursor.close() - if connection: - connection.close() + if cursor: cursor.close() + if connection: connection.close() except: pass -def main(): + +def merge_registered_and_unregistered(all_users, registered_users): + """Combina ramais registrados e não registrados.""" + registered_map = { + (user["User"], user["Domain"]): user + for user in registered_users + } + merged_users = [] + for user in all_users: + reg = registered_map.get((user["User"], user["Domain"])) + merged_users.append({ + "User": user["User"], + "Domain": user["DomainUUID"], + "Status": reg["Status"] if reg else "Sem registro", + "Agent": reg.get("Agent", "") if reg else "" + }) + return merged_users + + +def save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain): + """Salva os registros do dia em arquivos CSV separados por domínio.""" + today = datetime.now().strftime("%Y%m%d") + output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "csv_registrations", today) + os.makedirs(output_dir, exist_ok=True) + + grouped_by_domain = {} + for registro in all_users_with_status: + domain_uuid = registro.get("Domain", "unknown") + grouped_by_domain.setdefault(domain_uuid, []).append(registro) + + for domain_uuid, domain_registros in grouped_by_domain.items(): + domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"}) + domain_description = domain_info["description"] + sanitized = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_") + filename = os.path.join(output_dir, f"{sanitized}.csv") + + trunks = trunks_by_domain.get(domain_uuid, []) + trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado" + destinations = destinations_by_domain.get(domain_uuid, []) + destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado" + + with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: + writer = csv.writer(csvfile, delimiter=";") + writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"]) + seen = set() + for registro in domain_registros: + key = (registro["User"], domain_uuid) + if key not in seen: + seen.add(key) + writer.writerow([ + registro.get("User", ""), + f"{domain_info['name']} ({domain_description})", + registro.get("Status", ""), + registro.get("Agent", ""), + trunks_str, + destinations_str + ]) + + log(f"CSV diário gerado para domínio '{domain_description}': {filename}") + + return output_dir + + +def generate_weekly_report(): + """ + Gera relatório semanal consolidado. + Lê os CSVs dos últimos 7 dias e consolida por ramal/domínio. + Um ramal é contado como 'Presente' se apareceu registrado em ao menos 1 dia. + """ + today = datetime.now() script_dir = os.path.dirname(os.path.abspath(__file__)) - load_env(os.path.join(script_dir, ".env")) - # Obter registros do FreeSWITCH + base_dir = os.path.join(script_dir, "csv_registrations") + weekly_dir = os.path.join(script_dir, "csv_semanal") + os.makedirs(weekly_dir, exist_ok=True) + + # Coleta os últimos 7 dias + days = [(today - timedelta(days=i)).strftime("%Y%m%d") for i in range(6, -1, -1)] + week_label = f"{days[0]}_a_{days[-1]}" + + log(f"Gerando relatório semanal para o período {days[0]} a {days[-1]}") + + # Estrutura: { domain_description: { ramal: { dias_presente, agent_mais_recente, domain_label } } } + weekly_data = {} + + for day in days: + day_dir = os.path.join(base_dir, day) + if not os.path.exists(day_dir): + log(f" Sem dados para o dia {day}, pulando.") + continue + + for csv_file in os.listdir(day_dir): + if not csv_file.endswith(".csv"): + continue + domain_key = csv_file.replace(".csv", "") + filepath = os.path.join(day_dir, csv_file) + + try: + with open(filepath, "r", encoding="utf-8-sig") as f: + reader = csv.DictReader(f, delimiter=";") + for row in reader: + ramal = row.get("Ramal", "").strip() + status = row.get("Status", "").strip() + agent = row.get("Dispositivo", "").strip() + domain_label = row.get("Domínio", "").strip() + + if not ramal: + continue + + if domain_key not in weekly_data: + weekly_data[domain_key] = {} + + if ramal not in weekly_data[domain_key]: + weekly_data[domain_key][ramal] = { + "dias_presente": 0, + "agent_mais_recente": "", + "domain_label": domain_label + } + + # Conta como presente se tiver qualquer status de registro + if status and status != "Sem registro": + weekly_data[domain_key][ramal]["dias_presente"] += 1 + # Atualiza o agent para o mais recente (último dia processado) + if agent: + weekly_data[domain_key][ramal]["agent_mais_recente"] = agent + + if domain_label: + weekly_data[domain_key][ramal]["domain_label"] = domain_label + + except Exception as e: + log(f" Erro ao ler {filepath}: {e}") + + if not weekly_data: + log("Nenhum dado encontrado para gerar relatório semanal.") + return + + # Gera um CSV por domínio + for domain_key, ramais in weekly_data.items(): + filename = os.path.join(weekly_dir, f"semanal_{domain_key}_{week_label}.csv") + + with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: + writer = csv.writer(csvfile, delimiter=";") + writer.writerow([ + "Ramal", + "Domínio", + "Status Semanal", + "Dias Presente (de 7)", + "Dispositivo (mais recente)" + ]) + for ramal, info in sorted(ramais.items()): + status_semanal = "Presente" if info["dias_presente"] >= 1 else "Ausente" + writer.writerow([ + ramal, + info["domain_label"], + status_semanal, + info["dias_presente"], + info["agent_mais_recente"] + ]) + + log(f"Relatório semanal gerado: {filename}") + + log(f"Relatório semanal concluído. {len(weekly_data)} domínios processados.") + + +def is_sunday(): + """Retorna True se hoje for domingo.""" + return datetime.now().weekday() == 6 + + +def main(): + load_env() + + log("=== Iniciando coleta diária ===") + + # Obtém registros do FreeSWITCH output = get_registrations() registered_users = parse_registrations(output) if output else [] - # Obter todos os ramais do banco de dados + # Obtém dados do banco all_users = get_all_users_from_db() - - # Obter o mapeamento de domain_uuid para nomes e descrições de domínio domain_mapping = get_domain_mapping() - - # Obter os troncos registrados por domínio trunks_by_domain = get_trunks_by_domain() - - # Obter os destinos diretos de ramal (DDR) por domínio destinations_by_domain = get_destinations_by_domain() - # Combinar ramais registrados e não registrados + # Combina registrados e não registrados all_users_with_status = merge_registered_and_unregistered(all_users, registered_users) - # Salvar os resultados em arquivos CSV separados por domínio - save_to_csv_by_domain(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain) + # Salva CSV do dia + save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain) + + log("=== Coleta diária concluída ===") + + # Se for domingo, gera o relatório semanal consolidado + if is_sunday(): + log("=== Domingo detectado — gerando relatório semanal ===") + generate_weekly_report() + if __name__ == "__main__": - main() + main() \ No newline at end of file