#!/usr/bin/env python3 import os import subprocess import json import re from flask import Flask, render_template, request, jsonify import socket app = Flask(__name__) CONFIG_FILE = '/data/wol-computers.conf' class WOLManager: def __init__(self): self.ensure_config_exists() def ensure_config_exists(self): os.makedirs('/data', exist_ok=True) if not os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'w') as f: f.write("# Format: name|mac|ip\n") def load_computers(self): computers = [] if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split('|') if len(parts) >= 2: computer = { 'name': parts[0], 'mac': parts[1], 'ip': parts[2] if len(parts) > 2 else '', 'status': self.ping_computer(parts[2]) if len(parts) > 2 and parts[2] else 'unknown' } computers.append(computer) return computers def ping_computer(self, ip): if not ip: return 'unknown' try: result = subprocess.run(['ping', '-c', '1', '-W', '2', ip], capture_output=True, timeout=5) return 'online' if result.returncode == 0 else 'offline' except: return 'unknown' def wake_computer(self, mac, name, ip=''): try: # Trimite magic packet result = subprocess.run(['wakeonlan', mac], capture_output=True, text=True) if result.returncode == 0: # Dacă avem IP, verifică dacă s-a trezit if ip: for i in range(10): # 30 secunde total if self.ping_computer(ip) == 'online': return { 'success': True, 'message': f'{name} s-a trezit după {i*3} secunde!', 'status': 'online' } if i < 9: # Nu aștepta după ultima încercare subprocess.run(['sleep', '3']) return { 'success': True, 'message': f'Magic packet trimis pentru {name}, dar nu răspunde la ping', 'status': 'unknown' } else: return { 'success': True, 'message': f'Magic packet trimis pentru {name}!', 'status': 'unknown' } else: return { 'success': False, 'message': f'Eroare la trimiterea magic packet: {result.stderr}', 'status': 'error' } except Exception as e: return { 'success': False, 'message': f'Eroare: {str(e)}', 'status': 'error' } def add_computer(self, name, mac, ip=''): # Validare MAC mac_pattern = r'^[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}$' if not re.match(mac_pattern, mac): return {'success': False, 'message': 'MAC address invalid!'} # Verifică duplicate - încarcă toate calculatoarele existente computers = self.load_computers() # Verifică dacă există deja un calculator cu același MAC for computer in computers: if computer['mac'].lower() == mac.lower(): return {'success': False, 'message': f'Un calculator cu MAC {mac} există deja: {computer["name"]}'} # Verifică dacă există deja un calculator cu același nume for computer in computers: if computer['name'].lower() == name.lower(): return {'success': False, 'message': f'Un calculator cu numele "{name}" există deja'} # Verifică dacă există deja un calculator cu același IP (dacă IP-ul este furnizat) if ip and ip.strip(): for computer in computers: if computer.get('ip') and computer['ip'].lower() == ip.lower(): return {'success': False, 'message': f'Un calculator cu IP {ip} există deja: {computer["name"]}'} # Adaugă în fișier with open(CONFIG_FILE, 'a') as f: f.write(f"{name}|{mac}|{ip}\n") return {'success': True, 'message': f'Calculator {name} adăugat!'} def rename_computer(self, old_name, new_name): if not new_name.strip(): return {'success': False, 'message': 'Numele nou nu poate fi gol!'} computers = [] found = False # Citește toate computerele if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split('|') if len(parts) >= 2: if parts[0] == old_name: # Redenumește computerul găsit computers.append(f"{new_name}|{parts[1]}|{parts[2] if len(parts) > 2 else ''}") found = True else: computers.append(line) else: computers.append(line) if not found: return {'success': False, 'message': f'Computerul {old_name} nu a fost găsit!'} # Verifică dacă noul nume există deja for computer_line in computers: if not computer_line.startswith('#') and computer_line.strip(): parts = computer_line.split('|') if len(parts) >= 2 and parts[0] == new_name and computer_line != f"{new_name}|{parts[1]}|{parts[2] if len(parts) > 2 else ''}": return {'success': False, 'message': f'Numele {new_name} este deja folosit!'} # Rescrie fișierul with open(CONFIG_FILE, 'w') as f: for computer_line in computers: f.write(computer_line + '\n') return {'success': True, 'message': f'Computerul a fost redenumit din {old_name} în {new_name}!'} def delete_computer(self, name=None, mac=None): if not name and not mac: return {'success': False, 'message': 'Trebuie specificat numele sau MAC-ul computerului!'} computers = [] found = False deleted_name = None # Citește toate computerele if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split('|') if len(parts) >= 2: # Verifică dacă este computerul de șters (prin nume sau MAC) if (name and parts[0] == name) or (mac and parts[1].lower() == mac.lower()): found = True deleted_name = parts[0] if parts[0] else f"Calculator cu MAC {parts[1]}" # Nu adaugă linia în lista, efectiv ștergând computerul else: computers.append(line) else: computers.append(line) if not found: identifier = name if name else f"MAC {mac}" return {'success': False, 'message': f'Computerul cu {identifier} nu a fost găsit!'} # Rescrie fișierul fără computerul șters with open(CONFIG_FILE, 'w') as f: for computer_line in computers: f.write(computer_line + '\n') return {'success': True, 'message': f'Computerul {deleted_name} a fost șters!'} def edit_computer(self, old_name, new_name, new_mac, new_ip=None): """Editează un calculator existent - permite modificarea numelui, MAC-ului și IP-ului""" if not new_name.strip(): return {'success': False, 'message': 'Numele nou nu poate fi gol!'} if not new_mac.strip(): return {'success': False, 'message': 'Adresa MAC nu poate fi goală!'} # Validează formatul MAC-ului (XX:XX:XX:XX:XX:XX) import re mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$' if not re.match(mac_pattern, new_mac): return {'success': False, 'message': 'Formatul MAC-ului este invalid! Folosește formatul XX:XX:XX:XX:XX:XX'} # Normalizează MAC-ul (lowercase și cu :) new_mac = new_mac.lower().replace('-', ':') computers = [] found = False old_mac = None # Citește toate computerele if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split('|') if len(parts) >= 2: if parts[0] == old_name: old_mac = parts[1] found = True # Verifică dacă noul MAC este diferit și nu există deja if new_mac != old_mac.lower(): # Verifică dacă noul MAC există deja la alt computer for check_line in computers: if not check_line.startswith('#') and check_line.strip(): check_parts = check_line.split('|') if len(check_parts) >= 2 and check_parts[1].lower() == new_mac: return {'success': False, 'message': f'Adresa MAC {new_mac} este deja folosită de alt computer!'} # Actualizează computerul cu noile valori new_ip_value = new_ip.strip() if new_ip else '' computers.append(f"{new_name}|{new_mac}|{new_ip_value}") else: # Verifică dacă noul MAC este folosit de alt computer if parts[1].lower() == new_mac and parts[0] != old_name: return {'success': False, 'message': f'Adresa MAC {new_mac} este deja folosită de computerul "{parts[0]}"!'} computers.append(line) else: computers.append(line) if not found: return {'success': False, 'message': f'Computerul "{old_name}" nu a fost găsit!'} # Verifică dacă noul nume există deja (doar dacă s-a schimbat numele) if new_name != old_name: for computer_line in computers: if not computer_line.startswith('#') and computer_line.strip(): parts = computer_line.split('|') if len(parts) >= 2 and parts[0] == new_name and not computer_line.startswith(f"{new_name}|{new_mac}|"): return {'success': False, 'message': f'Numele "{new_name}" este deja folosit de alt computer!'} # Rescrie fișierul with open(CONFIG_FILE, 'w') as f: for computer_line in computers: f.write(computer_line + '\n') return {'success': True, 'message': f'Computerul a fost actualizat cu succes!'} def scan_network(self, custom_network=None): try: # Încearcă să citească rezultatele scanului Windows mai întâi windows_scan_result = self.try_read_windows_scan_results(custom_network) if windows_scan_result: return windows_scan_result # Fallback la scanarea Linux tradițională return self.scan_network_linux(custom_network) except Exception as e: return {'success': False, 'message': f'Eroare la scanare: {str(e)}'} def try_read_windows_scan_results(self, custom_network=None): """Încearcă să citească rezultatele din scanul Windows""" try: import json from datetime import datetime, timedelta results_file = '/data/network-scan-results.json' # Verifică dacă există fișierul cu rezultate if not os.path.exists(results_file): return None # Citește rezultatele (cu suport pentru UTF-8 BOM din PowerShell) with open(results_file, 'r', encoding='utf-8-sig') as f: data = json.load(f) # Verifică vârsta rezultatelor (nu mai vechi de 30 minute) try: timestamp = datetime.fromisoformat(data.get('timestamp', '').replace('Z', '+00:00')) if datetime.now().replace(tzinfo=timestamp.tzinfo) - timestamp > timedelta(minutes=30): return None # Rezultatele sunt prea vechi except: return None # Timestamp invalid # Filtrează rezultatele pe baza rețelei specificate computers = data.get('computers', []) if custom_network and computers: import ipaddress try: net = ipaddress.ip_network(custom_network, strict=False) filtered_computers = [] for computer in computers: if ipaddress.ip_address(computer.get('ip', '')) in net: filtered_computers.append(computer) computers = filtered_computers except: pass # Rămân toate computerele dacă validarea eșuează # Returnează rezultatul adaptat if not computers and custom_network: return { 'success': True, 'computers': [], 'message': f'Nu s-au găsit dispozitive în rețeaua {custom_network}. Rulează scanul Windows pentru rezultate actualizate.' } message = data.get('message', f'Scanare Windows: găsite {len(computers)} dispozitive') if custom_network: message += f' în rețeaua {custom_network}' return { 'success': True, 'computers': computers, 'message': message } except Exception as e: # Nu afișa eroarea, încearcă scanarea Linux return None def scan_network_linux(self, custom_network=None): """Scanarea tradițională Linux pentru compatibilitate""" if custom_network: # Validează formatul rețelei CIDR import ipaddress try: ipaddress.ip_network(custom_network, strict=False) network = custom_network except ValueError: return {'success': False, 'message': f'Format rețea invalid: {custom_network}. Folosește format CIDR (ex: 192.168.1.0/24)'} else: # Detectează rețeaua locală folosind route (net-tools) result = subprocess.run(['route', '-n'], capture_output=True, text=True) network = '192.168.1.0/24' # default for line in result.stdout.split('\n'): if '0.0.0.0' in line and 'UG' in line: # default gateway parts = line.split() if len(parts) >= 2: gateway = parts[1] # Skip Docker bridge networks if gateway.startswith('172.17.') or gateway.startswith('172.18.') or gateway.startswith('172.19.') or gateway.startswith('172.20.'): continue # Construiește rețeaua bazată pe gateway network_parts = gateway.split('.') network = f"{network_parts[0]}.{network_parts[1]}.{network_parts[2]}.0/24" break # Încearcă să obțină MAC addresses din tabela ARP pentru dispozitive cunoscute arp_result = subprocess.run(['arp', '-a'], capture_output=True, text=True) scanned = [] for line in arp_result.stdout.split('\n'): # Regex pentru parsarea ARP match = re.search(r'\((\d+\.\d+\.\d+\.\d+)\).*([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', line) if match: ip = match.group(1) mac = match.group(2) hostname = line.split()[0] if line.split() else '?' # Skip Docker bridge networks if ip.startswith('172.17.') or ip.startswith('172.18.') or ip.startswith('172.19.') or ip.startswith('172.20.'): continue # Verifică dacă IP-ul este în rețeaua specificată if custom_network: import ipaddress try: net = ipaddress.ip_network(custom_network, strict=False) if ipaddress.ip_address(ip) in net: scanned.append({ 'ip': ip, 'mac': mac, 'hostname': hostname, 'status': self.ping_computer(ip) }) except: pass else: # Autodetectare - adaugă toate intrările ARP (fără Docker bridge) scanned.append({ 'ip': ip, 'mac': mac, 'hostname': hostname, 'status': self.ping_computer(ip) }) # Dacă nu s-au găsit dispozitive și este o rețea specificată custom, returnează mesaj informativ if custom_network and not scanned: return { 'success': True, 'computers': [], 'message': f'Nu s-au găsit dispozitive cu MAC addresses în rețeaua {custom_network}. În Docker Desktop Windows, scanarea automată este limitată. Rulează scanul Windows sau folosește butonul "➕ Adaugă Calculator" pentru a adăuga manual dispozitivele cu IP și MAC cunoscute.' } # Dacă nu s-au găsit dispozitive în autodetectare, probabil rulează în Docker if not custom_network and not scanned: return { 'success': True, 'computers': [], 'message': 'Nu s-au găsit dispozitive în rețeaua locală. Aplicația rulează în Docker cu acces limitat la rețea. Pentru rezultate complete, rulează scanul Windows din sistemul host sau specifică manual o rețea CIDR.' } return {'success': True, 'computers': scanned} wol_manager = WOLManager() @app.route('/') def index(): # Hot reload is working! return render_template('index.html') @app.route('/api/computers') def get_computers(): return jsonify(wol_manager.load_computers()) @app.route('/api/wake', methods=['POST']) def wake_computer(): data = request.get_json() result = wol_manager.wake_computer( data.get('mac'), data.get('name'), data.get('ip', '') ) return jsonify(result) @app.route('/api/wake-all', methods=['POST']) def wake_all(): computers = wol_manager.load_computers() results = [] for computer in computers: result = wol_manager.wake_computer( computer['mac'], computer['name'], computer.get('ip', '') ) results.append({ 'name': computer['name'], 'result': result }) return jsonify({'results': results}) @app.route('/api/add', methods=['POST']) def add_computer(): data = request.get_json() result = wol_manager.add_computer( data.get('name'), data.get('mac'), data.get('ip', '') ) return jsonify(result) @app.route('/api/rename', methods=['POST']) def rename_computer(): data = request.get_json() result = wol_manager.rename_computer( data.get('old_name'), data.get('new_name') ) return jsonify(result) @app.route('/api/edit', methods=['POST']) def edit_computer(): data = request.get_json() result = wol_manager.edit_computer( data.get('old_name'), data.get('new_name'), data.get('new_mac'), data.get('new_ip') ) return jsonify(result) @app.route('/api/delete', methods=['POST']) def delete_computer(): data = request.get_json() result = wol_manager.delete_computer( name=data.get('name'), mac=data.get('mac') ) return jsonify(result) @app.route('/api/scan', methods=['GET', 'POST']) def scan_network(): network = None if request.method == 'POST': data = request.get_json() network = data.get('network') if data else None elif request.method == 'GET': network = request.args.get('network') result = wol_manager.scan_network(network) return jsonify(result) @app.route('/api/scan/windows', methods=['POST']) def trigger_windows_scan(): """Declanșează un scan Windows prin apelarea script-ului PowerShell""" try: data = request.get_json() network = data.get('network') if data else None # Încearcă să execute PowerShell prin WSL interop script_path = '/scripts/windows-network-scan.ps1' output_path = '/data/network-scan-results.json' # Construiește comanda PowerShell prin WSL interop cmd = ['/mnt/c/Windows/System32/WindowsPowerShell/v1.0/powershell.exe', '-ExecutionPolicy', 'Bypass', '-File', script_path, '-OutputPath', output_path, '-Verbose'] if network: cmd.extend(['-Network', network]) # Încearcă să execute script-ul prin WSL interop try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode == 0: # Scanul a fost executat cu succes, citește rezultatele scan_result = wol_manager.scan_network(network) scan_result['message'] = 'Scan Windows executat automat cu succes!' return jsonify(scan_result) else: # Dacă execuția automată eșuează, oferă instrucțiuni manuale return jsonify({ 'success': False, 'message': f'Execuția automată a eșuat: {result.stderr[:200]}...', 'auto_execution_failed': True, 'instructions': 'Rulează manual din Windows unul dintre comenzile:', 'commands': [ f'scripts\\scan-network.bat{" -network " + network if network else ""}', f'scripts\\run-scan.ps1{" -Network " + network if network else ""}', f'powershell.exe -ExecutionPolicy Bypass -File scripts\\windows-network-scan.ps1 -OutputPath data\\network-scan-results.json{" -Network " + network if network else ""}' ] }) except (subprocess.TimeoutExpired, FileNotFoundError, PermissionError) as e: # Execuția automată nu este posibilă, oferă instrucțiuni manuale return jsonify({ 'success': False, 'message': f'Execuția automată nu este disponibilă: {str(e)[:100]}', 'auto_execution_failed': True, 'instructions': 'Rulează manual din Windows unul dintre comenzile:', 'commands': [ f'scripts\\scan-network.bat{" -network " + network if network else ""}', f'scripts\\run-scan.ps1{" -Network " + network if network else ""}', f'powershell.exe -ExecutionPolicy Bypass -File scripts\\windows-network-scan.ps1 -OutputPath data\\network-scan-results.json{" -Network " + network if network else ""}' ] }) except Exception as e: return jsonify({ 'success': False, 'message': f'Eroare: {str(e)}', 'instructions': 'Rulează manual: scripts\\scan-network.bat' }) if __name__ == '__main__': # Get port from environment variable or default to 5000 port = int(os.environ.get('FLASK_PORT', 5000)) # Enable debug mode for development with hot reload app.run(host='0.0.0.0', port=port, debug=True, use_reloader=True)