REFACTOR: Atualiza funções de carregamento e salvamento, adiciona geração de CSV diário e relatório semanal
This commit is contained in:
parent
b0d653e809
commit
cc7fa5e7dd
@ -5,7 +5,7 @@ import subprocess
|
|||||||
import csv
|
import csv
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
|
||||||
|
|
||||||
@ -14,9 +14,13 @@ def log(message):
|
|||||||
print(f"[{timestamp}] {message}")
|
print(f"[{timestamp}] {message}")
|
||||||
|
|
||||||
|
|
||||||
def load_env(path=".env"):
|
def load_env(path=None):
|
||||||
"""Carrega variáveis de ambiente de um arquivo .env simples."""
|
"""Carrega variáveis de ambiente de um arquivo .env simples."""
|
||||||
|
if path is None:
|
||||||
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
path = os.path.join(script_dir, ".env")
|
||||||
if not os.path.exists(path):
|
if not os.path.exists(path):
|
||||||
|
log(f"Aviso: arquivo .env não encontrado em {path}")
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
with open(path, "r", encoding="utf-8") as f:
|
with open(path, "r", encoding="utf-8") as f:
|
||||||
@ -43,8 +47,9 @@ def get_db_connection():
|
|||||||
database=os.getenv("DB_NAME", "fusionpbx"),
|
database=os.getenv("DB_NAME", "fusionpbx"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_registrations():
|
def get_registrations():
|
||||||
"""Executa o comando do FreeSWITCH para pegar registros SIP"""
|
"""Executa o comando do FreeSWITCH para pegar registros SIP."""
|
||||||
try:
|
try:
|
||||||
fs_cli = os.getenv("FS_CLI_PATH", "fs_cli")
|
fs_cli = os.getenv("FS_CLI_PATH", "fs_cli")
|
||||||
fs_profile = os.getenv("FS_PROFILE", "internal")
|
fs_profile = os.getenv("FS_PROFILE", "internal")
|
||||||
@ -67,8 +72,9 @@ def get_registrations():
|
|||||||
log(f"Erro ao executar subprocesso: {e}")
|
log(f"Erro ao executar subprocesso: {e}")
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
def parse_registrations(output):
|
def parse_registrations(output):
|
||||||
"""Faz o parsing da saída do comando e organiza os dados em JSON"""
|
"""Faz o parsing da saída do comando e organiza os dados em JSON."""
|
||||||
registros = []
|
registros = []
|
||||||
lines = output.splitlines()
|
lines = output.splitlines()
|
||||||
current_record = {}
|
current_record = {}
|
||||||
@ -76,7 +82,7 @@ def parse_registrations(output):
|
|||||||
for line in lines:
|
for line in lines:
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line.startswith("Call-ID:"):
|
if line.startswith("Call-ID:"):
|
||||||
if current_record: # Salva o registro anterior
|
if current_record:
|
||||||
registros.append(current_record)
|
registros.append(current_record)
|
||||||
current_record = {}
|
current_record = {}
|
||||||
current_record["Call-ID"] = line.split(":", 1)[1].strip()
|
current_record["Call-ID"] = line.split(":", 1)[1].strip()
|
||||||
@ -104,127 +110,6 @@ def parse_registrations(output):
|
|||||||
|
|
||||||
return registros
|
return registros
|
||||||
|
|
||||||
def save_to_csv(registros):
|
|
||||||
"""Salva os registros em arquivos CSV com colunas formatadas"""
|
|
||||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
output_dir = "csv_registrations"
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
filename = f"{output_dir}/registrations_{timestamp}.csv"
|
|
||||||
with open(filename, "w", newline="", encoding="utf-8") as csvfile:
|
|
||||||
writer = csv.writer(csvfile)
|
|
||||||
# Escreve o cabeçalho formatado
|
|
||||||
writer.writerow(["Ramal", "Domínio", "Status"])
|
|
||||||
|
|
||||||
# Remove duplicatas e escreve os registros
|
|
||||||
seen = set()
|
|
||||||
for registro in registros:
|
|
||||||
key = (registro["User"], registro["Domain"]) # Identifica duplicatas por Ramal e Domínio
|
|
||||||
if key not in seen:
|
|
||||||
seen.add(key)
|
|
||||||
writer.writerow([
|
|
||||||
registro.get("User", ""),
|
|
||||||
registro.get("Domain", ""),
|
|
||||||
registro.get("Status", "")
|
|
||||||
])
|
|
||||||
|
|
||||||
log(f"CSV gerado: {filename}")
|
|
||||||
|
|
||||||
def save_to_csv_by_auth_realm(registros):
|
|
||||||
"""Salva os registros em arquivos CSV separados por Auth-Realm com colunas específicas"""
|
|
||||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
output_dir = "csv_registrations"
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Group registros by Auth-Realm
|
|
||||||
grouped_by_realm = {}
|
|
||||||
for registro in registros:
|
|
||||||
auth_realm = registro.get("Auth-Realm", "unknown")
|
|
||||||
if auth_realm not in grouped_by_realm:
|
|
||||||
grouped_by_realm[auth_realm] = []
|
|
||||||
grouped_by_realm[auth_realm].append(registro)
|
|
||||||
|
|
||||||
# Create a CSV file for each Auth-Realm
|
|
||||||
for auth_realm, realm_registros in grouped_by_realm.items():
|
|
||||||
sanitized_realm = auth_realm.replace("/", "_").replace("\\", "_") # Sanitize filename
|
|
||||||
filename = f"{output_dir}/registrations_{sanitized_realm}_{timestamp}.csv"
|
|
||||||
with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: # Use utf-8-sig encoding
|
|
||||||
# Use csv.writer with explicit delimiter
|
|
||||||
writer = csv.writer(csvfile, delimiter=";") # Use semicolon as delimiter
|
|
||||||
# Write header with updated column names
|
|
||||||
writer.writerow(["Usuário", "Registrado em", "Status", "Domínio"])
|
|
||||||
for registro in realm_registros:
|
|
||||||
# Write only the selected columns with updated names
|
|
||||||
writer.writerow([
|
|
||||||
registro.get("User", ""),
|
|
||||||
registro.get("Agent", ""),
|
|
||||||
registro.get("Status", ""),
|
|
||||||
registro.get("Auth-Realm", "")
|
|
||||||
])
|
|
||||||
|
|
||||||
log(f"CSV gerado para Auth-Realm '{auth_realm}': {filename}")
|
|
||||||
|
|
||||||
def save_to_csv_by_domain(registros, domain_mapping, trunks_by_domain, destinations_by_domain):
|
|
||||||
"""Salva os registros em arquivos CSV separados por domínio, incluindo troncos e DDR."""
|
|
||||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
output_dir = "csv_registrations"
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Agrupa os registros por domínio
|
|
||||||
grouped_by_domain = {}
|
|
||||||
for registro in registros:
|
|
||||||
domain_uuid = registro.get("Domain", "unknown")
|
|
||||||
if domain_uuid not in grouped_by_domain:
|
|
||||||
grouped_by_domain[domain_uuid] = []
|
|
||||||
grouped_by_domain[domain_uuid].append(registro)
|
|
||||||
|
|
||||||
# Cria um arquivo CSV para cada domínio
|
|
||||||
for domain_uuid, domain_registros in grouped_by_domain.items():
|
|
||||||
domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"})
|
|
||||||
domain_description = domain_info["description"]
|
|
||||||
sanitized_description = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_") # Sanitiza a descrição
|
|
||||||
filename = f"{output_dir}/registrations_{sanitized_description}_{timestamp}.csv"
|
|
||||||
|
|
||||||
# Obtém os troncos para o domínio
|
|
||||||
trunks = trunks_by_domain.get(domain_uuid, [])
|
|
||||||
trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado"
|
|
||||||
|
|
||||||
# Obtém os destinos para o domínio
|
|
||||||
destinations = destinations_by_domain.get(domain_uuid, [])
|
|
||||||
destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado"
|
|
||||||
|
|
||||||
with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile: # Use utf-8-sig para compatibilidade com Excel
|
|
||||||
writer = csv.writer(csvfile, delimiter=";") # Use ; como delimitador
|
|
||||||
# Escreve o cabeçalho corrigido
|
|
||||||
writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"])
|
|
||||||
# Escreve os registros
|
|
||||||
seen = set()
|
|
||||||
for registro in domain_registros:
|
|
||||||
key = (registro["User"], domain_uuid) # Identifica duplicatas por Ramal e UUID do Domínio
|
|
||||||
if key not in seen:
|
|
||||||
seen.add(key)
|
|
||||||
writer.writerow([
|
|
||||||
registro.get("User", ""),
|
|
||||||
f"{domain_info['name']} ({domain_description})", # Nome e descrição no campo "Domínio"
|
|
||||||
registro.get("Status", ""),
|
|
||||||
registro.get("Agent", ""), # <-- Adiciona o dispositivo
|
|
||||||
trunks_str, # Adiciona os troncos
|
|
||||||
destinations_str # Adiciona os DDR
|
|
||||||
])
|
|
||||||
|
|
||||||
log(f"CSV gerado para domínio '{domain_description}': {filename}")
|
|
||||||
|
|
||||||
def save_to_json(registros):
|
|
||||||
"""Salva os registros em um arquivo JSON"""
|
|
||||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
output_dir = "json_registrations"
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
filename = f"{output_dir}/registrations_{timestamp}.json"
|
|
||||||
with open(filename, "w", encoding="utf-8") as jsonfile:
|
|
||||||
json.dump(registros, jsonfile, indent=4, ensure_ascii=False)
|
|
||||||
|
|
||||||
log(f"JSON gerado: {filename}")
|
|
||||||
|
|
||||||
def get_all_users_from_db():
|
def get_all_users_from_db():
|
||||||
"""Obtém todos os ramais e seus domínios do banco de dados."""
|
"""Obtém todos os ramais e seus domínios do banco de dados."""
|
||||||
@ -240,20 +125,18 @@ def get_all_users_from_db():
|
|||||||
"""
|
"""
|
||||||
cursor.execute(query)
|
cursor.execute(query)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
all_users = [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results]
|
return [{"User": row[0], "DomainUUID": row[1], "Domain": row[2]} for row in results]
|
||||||
return all_users
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(f"Erro ao consultar o banco de dados: {e}")
|
log(f"Erro ao consultar o banco de dados: {e}")
|
||||||
return []
|
return []
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
if cursor:
|
if cursor: cursor.close()
|
||||||
cursor.close()
|
if connection: connection.close()
|
||||||
if connection:
|
|
||||||
connection.close()
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_domain_mapping():
|
def get_domain_mapping():
|
||||||
"""Obtém o mapeamento de domain_uuid para nomes e descrições de domínio."""
|
"""Obtém o mapeamento de domain_uuid para nomes e descrições de domínio."""
|
||||||
connection = None
|
connection = None
|
||||||
@ -261,48 +144,23 @@ def get_domain_mapping():
|
|||||||
try:
|
try:
|
||||||
connection = get_db_connection()
|
connection = get_db_connection()
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
query = """
|
query = "SELECT domain_uuid, domain_name, domain_description FROM v_domains"
|
||||||
SELECT domain_uuid, domain_name, domain_description
|
|
||||||
FROM v_domains
|
|
||||||
"""
|
|
||||||
cursor.execute(query)
|
cursor.execute(query)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
domain_mapping = {
|
return {
|
||||||
row[0]: {"name": row[1], "description": row[2] or "unknown"} for row in results
|
row[0]: {"name": row[1], "description": row[2] or "unknown"}
|
||||||
|
for row in results
|
||||||
}
|
}
|
||||||
return domain_mapping
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(f"Erro ao consultar o banco de dados para domínios: {e}")
|
log(f"Erro ao consultar o banco de dados para domínios: {e}")
|
||||||
return {}
|
return {}
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
if cursor:
|
if cursor: cursor.close()
|
||||||
cursor.close()
|
if connection: connection.close()
|
||||||
if connection:
|
|
||||||
connection.close()
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def merge_registered_and_unregistered(all_users, registered_users):
|
|
||||||
"""Combina ramais registrados e não registrados, filtrando por domínio (nome)."""
|
|
||||||
# O registered_map usa (User, Domain) onde Domain é o NOME do domínio
|
|
||||||
registered_map = {
|
|
||||||
(user["User"], user["Domain"]): user
|
|
||||||
for user in registered_users
|
|
||||||
}
|
|
||||||
|
|
||||||
merged_users = []
|
|
||||||
for user in all_users:
|
|
||||||
# Compare pelo nome do domínio
|
|
||||||
reg = registered_map.get((user["User"], user["Domain"]))
|
|
||||||
merged_users.append({
|
|
||||||
"User": user["User"],
|
|
||||||
"Domain": user["DomainUUID"], # Para agrupamento e uso do UUID
|
|
||||||
"Status": reg["Status"] if reg else "Sem registro",
|
|
||||||
"Agent": reg["Agent"] if reg and "Agent" in reg else ""
|
|
||||||
})
|
|
||||||
|
|
||||||
return merged_users
|
|
||||||
|
|
||||||
def get_trunks_by_domain():
|
def get_trunks_by_domain():
|
||||||
"""Obtém os troncos registrados por domínio."""
|
"""Obtém os troncos registrados por domínio."""
|
||||||
@ -311,33 +169,25 @@ def get_trunks_by_domain():
|
|||||||
try:
|
try:
|
||||||
connection = get_db_connection()
|
connection = get_db_connection()
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
query = """
|
query = "SELECT domain_uuid, gateway FROM v_gateways WHERE enabled = 'true'"
|
||||||
SELECT domain_uuid, gateway, enabled
|
|
||||||
FROM v_gateways
|
|
||||||
WHERE enabled = 'true'
|
|
||||||
"""
|
|
||||||
cursor.execute(query)
|
cursor.execute(query)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
trunks_by_domain = {}
|
trunks_by_domain = {}
|
||||||
for row in results:
|
for row in results:
|
||||||
domain_uuid = row[0]
|
domain_uuid, gateway = row[0], row[1]
|
||||||
gateway = row[1]
|
trunks_by_domain.setdefault(domain_uuid, []).append(gateway)
|
||||||
if domain_uuid not in trunks_by_domain:
|
|
||||||
trunks_by_domain[domain_uuid] = []
|
|
||||||
trunks_by_domain[domain_uuid].append(gateway)
|
|
||||||
return trunks_by_domain
|
return trunks_by_domain
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(f"Erro ao consultar os troncos no banco de dados: {e}")
|
log(f"Erro ao consultar os troncos no banco de dados: {e}")
|
||||||
return {}
|
return {}
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
if cursor:
|
if cursor: cursor.close()
|
||||||
cursor.close()
|
if connection: connection.close()
|
||||||
if connection:
|
|
||||||
connection.close()
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_destinations_by_domain():
|
def get_destinations_by_domain():
|
||||||
"""Obtém os destinos diretos de ramal (DDR) por domínio, sem duplicidades."""
|
"""Obtém os destinos diretos de ramal (DDR) por domínio, sem duplicidades."""
|
||||||
connection = None
|
connection = None
|
||||||
@ -345,63 +195,223 @@ def get_destinations_by_domain():
|
|||||||
try:
|
try:
|
||||||
connection = get_db_connection()
|
connection = get_db_connection()
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
query = """
|
query = "SELECT domain_uuid, destination_number, destination_enabled FROM v_destinations"
|
||||||
SELECT domain_uuid, destination_number, destination_enabled
|
|
||||||
FROM v_destinations
|
|
||||||
"""
|
|
||||||
cursor.execute(query)
|
cursor.execute(query)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
destinations_by_domain = {}
|
destinations_by_domain = {}
|
||||||
for row in results:
|
for row in results:
|
||||||
domain_uuid = row[0]
|
domain_uuid, destination_number, enabled = row
|
||||||
destination_number = row[1]
|
entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})"
|
||||||
enabled = row[2]
|
destinations_by_domain.setdefault(domain_uuid, set()).add(entry)
|
||||||
destination_entry = f"{destination_number} ({'Habilitado' if enabled == 'true' else 'Desabilitado'})"
|
|
||||||
if domain_uuid not in destinations_by_domain:
|
|
||||||
destinations_by_domain[domain_uuid] = set() # Usar um conjunto para evitar duplicidades
|
|
||||||
destinations_by_domain[domain_uuid].add(destination_entry) # Adiciona ao conjunto
|
|
||||||
|
|
||||||
# Converte os conjuntos para listas ordenadas
|
|
||||||
for domain_uuid in destinations_by_domain:
|
for domain_uuid in destinations_by_domain:
|
||||||
destinations_by_domain[domain_uuid] = sorted(destinations_by_domain[domain_uuid])
|
destinations_by_domain[domain_uuid] = sorted(destinations_by_domain[domain_uuid])
|
||||||
|
|
||||||
return destinations_by_domain
|
return destinations_by_domain
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(f"Erro ao consultar os destinos no banco de dados: {e}")
|
log(f"Erro ao consultar os destinos no banco de dados: {e}")
|
||||||
return {}
|
return {}
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
if cursor:
|
if cursor: cursor.close()
|
||||||
cursor.close()
|
if connection: connection.close()
|
||||||
if connection:
|
|
||||||
connection.close()
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def main():
|
|
||||||
|
def merge_registered_and_unregistered(all_users, registered_users):
|
||||||
|
"""Combina ramais registrados e não registrados."""
|
||||||
|
registered_map = {
|
||||||
|
(user["User"], user["Domain"]): user
|
||||||
|
for user in registered_users
|
||||||
|
}
|
||||||
|
merged_users = []
|
||||||
|
for user in all_users:
|
||||||
|
reg = registered_map.get((user["User"], user["Domain"]))
|
||||||
|
merged_users.append({
|
||||||
|
"User": user["User"],
|
||||||
|
"Domain": user["DomainUUID"],
|
||||||
|
"Status": reg["Status"] if reg else "Sem registro",
|
||||||
|
"Agent": reg.get("Agent", "") if reg else ""
|
||||||
|
})
|
||||||
|
return merged_users
|
||||||
|
|
||||||
|
|
||||||
|
def save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain):
|
||||||
|
"""Salva os registros do dia em arquivos CSV separados por domínio."""
|
||||||
|
today = datetime.now().strftime("%Y%m%d")
|
||||||
|
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "csv_registrations", today)
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
grouped_by_domain = {}
|
||||||
|
for registro in all_users_with_status:
|
||||||
|
domain_uuid = registro.get("Domain", "unknown")
|
||||||
|
grouped_by_domain.setdefault(domain_uuid, []).append(registro)
|
||||||
|
|
||||||
|
for domain_uuid, domain_registros in grouped_by_domain.items():
|
||||||
|
domain_info = domain_mapping.get(domain_uuid, {"name": "unknown", "description": "unknown"})
|
||||||
|
domain_description = domain_info["description"]
|
||||||
|
sanitized = domain_description.replace("/", "_").replace("\\", "_").replace(" ", "_")
|
||||||
|
filename = os.path.join(output_dir, f"{sanitized}.csv")
|
||||||
|
|
||||||
|
trunks = trunks_by_domain.get(domain_uuid, [])
|
||||||
|
trunks_str = ", ".join(trunks) if trunks else "Nenhum tronco registrado"
|
||||||
|
destinations = destinations_by_domain.get(domain_uuid, [])
|
||||||
|
destinations_str = ", ".join(destinations) if destinations else "Nenhum DDR registrado"
|
||||||
|
|
||||||
|
with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile:
|
||||||
|
writer = csv.writer(csvfile, delimiter=";")
|
||||||
|
writer.writerow(["Ramal", "Domínio", "Status", "Dispositivo", "Troncos", "DDR"])
|
||||||
|
seen = set()
|
||||||
|
for registro in domain_registros:
|
||||||
|
key = (registro["User"], domain_uuid)
|
||||||
|
if key not in seen:
|
||||||
|
seen.add(key)
|
||||||
|
writer.writerow([
|
||||||
|
registro.get("User", ""),
|
||||||
|
f"{domain_info['name']} ({domain_description})",
|
||||||
|
registro.get("Status", ""),
|
||||||
|
registro.get("Agent", ""),
|
||||||
|
trunks_str,
|
||||||
|
destinations_str
|
||||||
|
])
|
||||||
|
|
||||||
|
log(f"CSV diário gerado para domínio '{domain_description}': {filename}")
|
||||||
|
|
||||||
|
return output_dir
|
||||||
|
|
||||||
|
|
||||||
|
def generate_weekly_report():
|
||||||
|
"""
|
||||||
|
Gera relatório semanal consolidado.
|
||||||
|
Lê os CSVs dos últimos 7 dias e consolida por ramal/domínio.
|
||||||
|
Um ramal é contado como 'Presente' se apareceu registrado em ao menos 1 dia.
|
||||||
|
"""
|
||||||
|
today = datetime.now()
|
||||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
load_env(os.path.join(script_dir, ".env"))
|
base_dir = os.path.join(script_dir, "csv_registrations")
|
||||||
# Obter registros do FreeSWITCH
|
weekly_dir = os.path.join(script_dir, "csv_semanal")
|
||||||
|
os.makedirs(weekly_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Coleta os últimos 7 dias
|
||||||
|
days = [(today - timedelta(days=i)).strftime("%Y%m%d") for i in range(6, -1, -1)]
|
||||||
|
week_label = f"{days[0]}_a_{days[-1]}"
|
||||||
|
|
||||||
|
log(f"Gerando relatório semanal para o período {days[0]} a {days[-1]}")
|
||||||
|
|
||||||
|
# Estrutura: { domain_description: { ramal: { dias_presente, agent_mais_recente, domain_label } } }
|
||||||
|
weekly_data = {}
|
||||||
|
|
||||||
|
for day in days:
|
||||||
|
day_dir = os.path.join(base_dir, day)
|
||||||
|
if not os.path.exists(day_dir):
|
||||||
|
log(f" Sem dados para o dia {day}, pulando.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
for csv_file in os.listdir(day_dir):
|
||||||
|
if not csv_file.endswith(".csv"):
|
||||||
|
continue
|
||||||
|
domain_key = csv_file.replace(".csv", "")
|
||||||
|
filepath = os.path.join(day_dir, csv_file)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(filepath, "r", encoding="utf-8-sig") as f:
|
||||||
|
reader = csv.DictReader(f, delimiter=";")
|
||||||
|
for row in reader:
|
||||||
|
ramal = row.get("Ramal", "").strip()
|
||||||
|
status = row.get("Status", "").strip()
|
||||||
|
agent = row.get("Dispositivo", "").strip()
|
||||||
|
domain_label = row.get("Domínio", "").strip()
|
||||||
|
|
||||||
|
if not ramal:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if domain_key not in weekly_data:
|
||||||
|
weekly_data[domain_key] = {}
|
||||||
|
|
||||||
|
if ramal not in weekly_data[domain_key]:
|
||||||
|
weekly_data[domain_key][ramal] = {
|
||||||
|
"dias_presente": 0,
|
||||||
|
"agent_mais_recente": "",
|
||||||
|
"domain_label": domain_label
|
||||||
|
}
|
||||||
|
|
||||||
|
# Conta como presente se tiver qualquer status de registro
|
||||||
|
if status and status != "Sem registro":
|
||||||
|
weekly_data[domain_key][ramal]["dias_presente"] += 1
|
||||||
|
# Atualiza o agent para o mais recente (último dia processado)
|
||||||
|
if agent:
|
||||||
|
weekly_data[domain_key][ramal]["agent_mais_recente"] = agent
|
||||||
|
|
||||||
|
if domain_label:
|
||||||
|
weekly_data[domain_key][ramal]["domain_label"] = domain_label
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log(f" Erro ao ler {filepath}: {e}")
|
||||||
|
|
||||||
|
if not weekly_data:
|
||||||
|
log("Nenhum dado encontrado para gerar relatório semanal.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Gera um CSV por domínio
|
||||||
|
for domain_key, ramais in weekly_data.items():
|
||||||
|
filename = os.path.join(weekly_dir, f"semanal_{domain_key}_{week_label}.csv")
|
||||||
|
|
||||||
|
with open(filename, "w", newline="", encoding="utf-8-sig") as csvfile:
|
||||||
|
writer = csv.writer(csvfile, delimiter=";")
|
||||||
|
writer.writerow([
|
||||||
|
"Ramal",
|
||||||
|
"Domínio",
|
||||||
|
"Status Semanal",
|
||||||
|
"Dias Presente (de 7)",
|
||||||
|
"Dispositivo (mais recente)"
|
||||||
|
])
|
||||||
|
for ramal, info in sorted(ramais.items()):
|
||||||
|
status_semanal = "Presente" if info["dias_presente"] >= 1 else "Ausente"
|
||||||
|
writer.writerow([
|
||||||
|
ramal,
|
||||||
|
info["domain_label"],
|
||||||
|
status_semanal,
|
||||||
|
info["dias_presente"],
|
||||||
|
info["agent_mais_recente"]
|
||||||
|
])
|
||||||
|
|
||||||
|
log(f"Relatório semanal gerado: {filename}")
|
||||||
|
|
||||||
|
log(f"Relatório semanal concluído. {len(weekly_data)} domínios processados.")
|
||||||
|
|
||||||
|
|
||||||
|
def is_sunday():
|
||||||
|
"""Retorna True se hoje for domingo."""
|
||||||
|
return datetime.now().weekday() == 6
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
load_env()
|
||||||
|
|
||||||
|
log("=== Iniciando coleta diária ===")
|
||||||
|
|
||||||
|
# Obtém registros do FreeSWITCH
|
||||||
output = get_registrations()
|
output = get_registrations()
|
||||||
registered_users = parse_registrations(output) if output else []
|
registered_users = parse_registrations(output) if output else []
|
||||||
|
|
||||||
# Obter todos os ramais do banco de dados
|
# Obtém dados do banco
|
||||||
all_users = get_all_users_from_db()
|
all_users = get_all_users_from_db()
|
||||||
|
|
||||||
# Obter o mapeamento de domain_uuid para nomes e descrições de domínio
|
|
||||||
domain_mapping = get_domain_mapping()
|
domain_mapping = get_domain_mapping()
|
||||||
|
|
||||||
# Obter os troncos registrados por domínio
|
|
||||||
trunks_by_domain = get_trunks_by_domain()
|
trunks_by_domain = get_trunks_by_domain()
|
||||||
|
|
||||||
# Obter os destinos diretos de ramal (DDR) por domínio
|
|
||||||
destinations_by_domain = get_destinations_by_domain()
|
destinations_by_domain = get_destinations_by_domain()
|
||||||
|
|
||||||
# Combinar ramais registrados e não registrados
|
# Combina registrados e não registrados
|
||||||
all_users_with_status = merge_registered_and_unregistered(all_users, registered_users)
|
all_users_with_status = merge_registered_and_unregistered(all_users, registered_users)
|
||||||
|
|
||||||
# Salvar os resultados em arquivos CSV separados por domínio
|
# Salva CSV do dia
|
||||||
save_to_csv_by_domain(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain)
|
save_daily_csv(all_users_with_status, domain_mapping, trunks_by_domain, destinations_by_domain)
|
||||||
|
|
||||||
|
log("=== Coleta diária concluída ===")
|
||||||
|
|
||||||
|
# Se for domingo, gera o relatório semanal consolidado
|
||||||
|
if is_sunday():
|
||||||
|
log("=== Domingo detectado — gerando relatório semanal ===")
|
||||||
|
generate_weekly_report()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
Loading…
Reference in New Issue
Block a user