# /opt/ghostshield-pulse-f2b/agent/gs-pulse-agent.py import subprocess import re import json import sys import os import time import urllib.request import urllib.parse import threading import collections import socket import ssl # === КОНФИГУРАЦИЯ === API_URL = "https://pulse.bargcraft.top/api/telemetry" CONFIG_PATH = "/opt/ghostshield-pulse-f2b/.agent.env" NODE_ID = "gs-placeholder" NODE_TOKEN = "token-placeholder" # Префиксы логов iptables PREFIX_DROP = "GS_PULSE_DROP:" PREFIX_ACCEPT = "GS_PULSE_ACCEPT:" # Буфер логов для Дашборда log_buffer = collections.deque(maxlen=15) BACKEND_IP = "" def log(msg): """Кастомный логгер, который пишет в консоль и сохраняет в кольцевой буфер""" timestamp = time.strftime("%H:%M:%S") formatted = f"[{timestamp}] {msg}" print(formatted) log_buffer.append(formatted) def load_config(): """Загрузка конфигурации из файла для предотвращения утечки токена через CLI""" global NODE_ID, NODE_TOKEN if os.path.exists(CONFIG_PATH): try: with open(CONFIG_PATH, "r") as f: for line in f: if "=" in line: key, value = line.strip().split("=", 1) if key == "NODE_ID": NODE_ID = value.strip('"') elif key == "NODE_TOKEN": NODE_TOKEN = value.strip('"') log("[+] Конфигурация загружена из файла.") except Exception as e: log(f"[-] Ошибка чтения конфига: {e}") def resolve_backend_ip(): """Получает IP бэкенда для добавления в локальный вайтлист""" global BACKEND_IP try: domain = urllib.parse.urlparse(API_URL).netloc.split(':')[0] BACKEND_IP = socket.gethostbyname(domain) log(f"[*] IP бэкенда определен как {BACKEND_IP} (внесен в белый список)") except Exception as e: log(f"[-] Не удалось определить IP бэкенда: {e}") def run_command(cmd_list): """Безопасный запуск системных команд через список аргументов""" try: result = subprocess.run(cmd_list, capture_output=True, text=True, shell=False) return result.returncode == 0 except Exception as e: log(f"[-] Ошибка выполнения команды {cmd_list[0]}: {e}") return False def setup_iptables(): log("[*] Настройка ловушек iptables...") # Ограничиваем частоту логов для DROP run_command(["iptables", "-I", "INPUT", "-m", "limit", "--limit", "5/min", "-j", "LOG", "--log-prefix", f"{PREFIX_DROP} "]) # Логируем новые TCP соединения run_command(["iptables", "-I", "INPUT", "-p", "tcp", "--syn", "-j", "LOG", "--log-prefix", f"{PREFIX_ACCEPT} "]) # Аналогично для IPv6 run_command(["ip6tables", "-I", "INPUT", "-m", "limit", "--limit", "5/min", "-j", "LOG", "--log-prefix", f"{PREFIX_DROP} "]) log("[+] Правила применены.") def get_sys_metrics(): """Сбор метрик системы без установки сторонних библиотек""" cpu, ram = 0.0, 0.0 try: if os.path.exists('/proc/loadavg'): with open('/proc/loadavg', 'r') as f: cpu = float(f.read().split()[0]) if os.path.exists('/proc/meminfo'): with open('/proc/meminfo', 'r') as f: mem = f.read() total = int(re.search(r'MemTotal:\s+(\d+)', mem).group(1)) avail = int(re.search(r'MemAvailable:\s+(\d+)', mem).group(1)) ram = round((total - avail) / total * 100, 1) except Exception: pass f2b_status = subprocess.run(["systemctl", "is-active", "fail2ban"], capture_output=True, text=True).stdout.strip() f2b_active = (f2b_status == "active") return cpu, ram, f2b_active def execute_task(task): """Исполнитель задач с валидацией входных данных (защита от RCE)""" task_id = task.get("task_id") cmd = task.get("command") payload = str(task.get("payload", "")) log(f"[*] Выполнение задачи #{task_id}: {cmd} {payload}") # Валидация IP для команд бана/разбана ip_pattern = r'^([0-9a-fA-F]{1,4}:){1,7}[0-9a-fA-F]{1,4}$|^\d{1,3}(\.\d{1,3}){3}$' if cmd == "install_f2b": log("[*] Установка Fail2Ban...") subprocess.run(["apt-get", "update"], check=False) subprocess.run(["apt-get", "install", "-y", "fail2ban"], check=False) jail_local = "[sshd]\nenabled = true\nport = ssh\nfilter = sshd\nlogpath = /var/log/auth.log\nmaxretry = 3\nbantime = 3600\n" try: with open("/etc/fail2ban/jail.local", "w") as f: f.write(jail_local) subprocess.run(["systemctl", "restart", "fail2ban"], check=False) log("[+] Fail2Ban установлен и запущен.") except Exception as e: log(f"[-] Ошибка настройки F2B: {e}") elif cmd == "ban_ip": if re.match(ip_pattern, payload): run_command(["fail2ban-client", "set", "sshd", "banip", payload]) log(f"[+] IP {payload} забанен.") else: log(f"[-] Отмена: Некорректный IP {payload}") elif cmd == "unban_ip": if re.match(ip_pattern, payload): run_command(["fail2ban-client", "set", "sshd", "unbanip", payload]) log(f"[+] IP {payload} разбанен.") else: log(f"[-] Отмена: Некорректный IP {payload}") def send_telemetry(event_data=None): """Отправка телеметрии и получение задач (Polling)""" url = f"{API_URL}/{NODE_ID}" cpu, ram, f2b = get_sys_metrics() payload = { "token": NODE_TOKEN, "agent_logs": list(log_buffer), "cpu_load": cpu, "ram_usage": ram, "f2b_active": f2b } if event_data: payload.update(event_data) else: payload.update({"event_type": "heartbeat", "ip": "heartbeat"}) try: req = urllib.request.Request(url) req.add_header('Content-Type', 'application/json; charset=utf-8') jsondata = json.dumps(payload).encode('utf-8') # Используем стандартный контекст SSL context = ssl.create_default_context() with urllib.request.urlopen(req, jsondata, timeout=10, context=context) as res: if res.getcode() == 200: resp_data = json.loads(res.read().decode('utf-8')) commands = resp_data.get("commands", []) for cmd in commands: execute_task(cmd) except Exception as e: log(f"[-] Ошибка связи: {e}") def monitor_iptables(): """Поток 1: Мониторинг логов ядра (journalctl)""" cmd = ["journalctl", "-k", "-f"] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) while True: line = process.stdout.readline() if not line: break event_type = "drop" if PREFIX_DROP in line else "accept_new" if PREFIX_ACCEPT in line else None if not event_type: continue try: src_ip_match = re.search(r'SRC=([0-9a-fA-F\.:]+)', line) dst_port_match = re.search(r'DPT=([0-9]+)', line) proto_match = re.search(r'PROTO=([a-zA-Z]+)', line) if not src_ip_match: continue src_ip = src_ip_match.group(1) dst_port = int(dst_port_match.group(1)) if dst_port_match else 0 proto = proto_match.group(1).lower() if proto_match else "unknown" if src_ip == BACKEND_IP: continue log(f"[!] {event_type.upper()} | IP: {src_ip} | Port: {dst_port}") send_telemetry({"ip": src_ip, "port": dst_port, "proto": proto, "event_type": event_type}) except Exception as e: log(f"[-] Ошибка парсинга строки лога: {e}") def monitor_f2b(): """Поток 2: Мониторинг логов Fail2Ban""" log_path = "/var/log/fail2ban.log" while not os.path.exists(log_path): time.sleep(10) process = subprocess.Popen(['tail', '-F', log_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) while True: line = process.stdout.readline() if not line: break if "Ban " in line and "Restore Ban" not in line: try: ip_match = re.search(r'Ban\s+([0-9a-fA-F\.:]+)', line) if ip_match: ip = ip_match.group(1) log(f"[!] AUTO_FAIL2BAN | IP: {ip}") send_telemetry({"ip": ip, "port": 0, "proto": "tcp", "event_type": "AUTO_FAIL2BAN"}) except Exception: pass def heartbeat_worker(): """Поток 3: Пинг бэкенда каждые 30 сек""" while True: send_telemetry() time.sleep(30) def main(): if os.getuid() != 0: print("!!! Ошибка: Агент должен быть запущен от имени root (sudo) !!!") sys.exit(1) load_config() log("=== GhostShield Pulse F2B Agent ===") log(f"ID Узла: {NODE_ID}") resolve_backend_ip() setup_iptables() # Запускаем фоновые потоки threading.Thread(target=heartbeat_worker, daemon=True).start() threading.Thread(target=monitor_f2b, daemon=True).start() log(f"[*] Мониторинг запущен. Ожидание атак...") try: monitor_iptables() except KeyboardInterrupt: log("\n[*] Остановка агента...") sys.exit(0) if __name__ == "__main__": main()