Files
wol/app/app.py
Marius Mutu acf234c600 Major feature enhancement: Windows PowerShell network scanning integration
- Added Windows PowerShell network scanner with auto-detection and interactive mode
- Implemented dual scanning system (Windows + Linux fallback)
- Added computer management features (rename, delete, duplicate checking)
- Enhanced UI with modern responsive design and Romanian localization
- Added comprehensive Windows-Linux integration with WSL interop
- Improved error handling and user feedback throughout
- Added hot reload for development and comprehensive documentation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-05 17:27:27 +03:00

489 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import subprocess
import json
import re
from flask import Flask, render_template, request, jsonify
import socket
app = Flask(__name__)
CONFIG_FILE = '/data/wol-computers.conf'
class WOLManager:
def __init__(self):
self.ensure_config_exists()
def ensure_config_exists(self):
os.makedirs('/data', exist_ok=True)
if not os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'w') as f:
f.write("# Format: name|mac|ip\n")
def load_computers(self):
computers = []
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split('|')
if len(parts) >= 2:
computer = {
'name': parts[0],
'mac': parts[1],
'ip': parts[2] if len(parts) > 2 else '',
'status': self.ping_computer(parts[2]) if len(parts) > 2 and parts[2] else 'unknown'
}
computers.append(computer)
return computers
def ping_computer(self, ip):
if not ip:
return 'unknown'
try:
result = subprocess.run(['ping', '-c', '1', '-W', '2', ip],
capture_output=True, timeout=5)
return 'online' if result.returncode == 0 else 'offline'
except:
return 'unknown'
def wake_computer(self, mac, name, ip=''):
try:
# Trimite magic packet
result = subprocess.run(['wakeonlan', mac], capture_output=True, text=True)
if result.returncode == 0:
# Dacă avem IP, verifică dacă s-a trezit
if ip:
for i in range(10): # 30 secunde total
if self.ping_computer(ip) == 'online':
return {
'success': True,
'message': f'{name} s-a trezit după {i*3} secunde!',
'status': 'online'
}
if i < 9: # Nu aștepta după ultima încercare
subprocess.run(['sleep', '3'])
return {
'success': True,
'message': f'Magic packet trimis pentru {name}, dar nu răspunde la ping',
'status': 'unknown'
}
else:
return {
'success': True,
'message': f'Magic packet trimis pentru {name}!',
'status': 'unknown'
}
else:
return {
'success': False,
'message': f'Eroare la trimiterea magic packet: {result.stderr}',
'status': 'error'
}
except Exception as e:
return {
'success': False,
'message': f'Eroare: {str(e)}',
'status': 'error'
}
def add_computer(self, name, mac, ip=''):
# Validare MAC
mac_pattern = r'^[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}$'
if not re.match(mac_pattern, mac):
return {'success': False, 'message': 'MAC address invalid!'}
# Verifică duplicate - încarcă toate calculatoarele existente
computers = self.load_computers()
# Verifică dacă există deja un calculator cu același MAC
for computer in computers:
if computer['mac'].lower() == mac.lower():
return {'success': False, 'message': f'Un calculator cu MAC {mac} există deja: {computer["name"]}'}
# Verifică dacă există deja un calculator cu același nume
for computer in computers:
if computer['name'].lower() == name.lower():
return {'success': False, 'message': f'Un calculator cu numele "{name}" există deja'}
# Verifică dacă există deja un calculator cu același IP (dacă IP-ul este furnizat)
if ip and ip.strip():
for computer in computers:
if computer.get('ip') and computer['ip'].lower() == ip.lower():
return {'success': False, 'message': f'Un calculator cu IP {ip} există deja: {computer["name"]}'}
# Adaugă în fișier
with open(CONFIG_FILE, 'a') as f:
f.write(f"{name}|{mac}|{ip}\n")
return {'success': True, 'message': f'Calculator {name} adăugat!'}
def rename_computer(self, old_name, new_name):
if not new_name.strip():
return {'success': False, 'message': 'Numele nou nu poate fi gol!'}
computers = []
found = False
# Citește toate computerele
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split('|')
if len(parts) >= 2:
if parts[0] == old_name:
# Redenumește computerul găsit
computers.append(f"{new_name}|{parts[1]}|{parts[2] if len(parts) > 2 else ''}")
found = True
else:
computers.append(line)
else:
computers.append(line)
if not found:
return {'success': False, 'message': f'Computerul {old_name} nu a fost găsit!'}
# Verifică dacă noul nume există deja
for computer_line in computers:
if not computer_line.startswith('#') and computer_line.strip():
parts = computer_line.split('|')
if len(parts) >= 2 and parts[0] == new_name and computer_line != f"{new_name}|{parts[1]}|{parts[2] if len(parts) > 2 else ''}":
return {'success': False, 'message': f'Numele {new_name} este deja folosit!'}
# Rescrie fișierul
with open(CONFIG_FILE, 'w') as f:
for computer_line in computers:
f.write(computer_line + '\n')
return {'success': True, 'message': f'Computerul a fost redenumit din {old_name} în {new_name}!'}
def delete_computer(self, name=None, mac=None):
if not name and not mac:
return {'success': False, 'message': 'Trebuie specificat numele sau MAC-ul computerului!'}
computers = []
found = False
deleted_name = None
# Citește toate computerele
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split('|')
if len(parts) >= 2:
# Verifică dacă este computerul de șters (prin nume sau MAC)
if (name and parts[0] == name) or (mac and parts[1].lower() == mac.lower()):
found = True
deleted_name = parts[0] if parts[0] else f"Calculator cu MAC {parts[1]}"
# Nu adaugă linia în lista, efectiv ștergând computerul
else:
computers.append(line)
else:
computers.append(line)
if not found:
identifier = name if name else f"MAC {mac}"
return {'success': False, 'message': f'Computerul cu {identifier} nu a fost găsit!'}
# Rescrie fișierul fără computerul șters
with open(CONFIG_FILE, 'w') as f:
for computer_line in computers:
f.write(computer_line + '\n')
return {'success': True, 'message': f'Computerul {deleted_name} a fost șters!'}
def scan_network(self, custom_network=None):
try:
# Încearcă să citească rezultatele scanului Windows mai întâi
windows_scan_result = self.try_read_windows_scan_results(custom_network)
if windows_scan_result:
return windows_scan_result
# Fallback la scanarea Linux tradițională
return self.scan_network_linux(custom_network)
except Exception as e:
return {'success': False, 'message': f'Eroare la scanare: {str(e)}'}
def try_read_windows_scan_results(self, custom_network=None):
"""Încearcă să citească rezultatele din scanul Windows"""
try:
import json
from datetime import datetime, timedelta
results_file = '/data/network-scan-results.json'
# Verifică dacă există fișierul cu rezultate
if not os.path.exists(results_file):
return None
# Citește rezultatele (cu suport pentru UTF-8 BOM din PowerShell)
with open(results_file, 'r', encoding='utf-8-sig') as f:
data = json.load(f)
# Verifică vârsta rezultatelor (nu mai vechi de 30 minute)
try:
timestamp = datetime.fromisoformat(data.get('timestamp', '').replace('Z', '+00:00'))
if datetime.now().replace(tzinfo=timestamp.tzinfo) - timestamp > timedelta(minutes=30):
return None # Rezultatele sunt prea vechi
except:
return None # Timestamp invalid
# Filtrează rezultatele pe baza rețelei specificate
computers = data.get('computers', [])
if custom_network and computers:
import ipaddress
try:
net = ipaddress.ip_network(custom_network, strict=False)
filtered_computers = []
for computer in computers:
if ipaddress.ip_address(computer.get('ip', '')) in net:
filtered_computers.append(computer)
computers = filtered_computers
except:
pass # Rămân toate computerele dacă validarea eșuează
# Returnează rezultatul adaptat
if not computers and custom_network:
return {
'success': True,
'computers': [],
'message': f'Nu s-au găsit dispozitive în rețeaua {custom_network}. Rulează scanul Windows pentru rezultate actualizate.'
}
message = data.get('message', f'Scanare Windows: găsite {len(computers)} dispozitive')
if custom_network:
message += f' în rețeaua {custom_network}'
return {
'success': True,
'computers': computers,
'message': message
}
except Exception as e:
# Nu afișa eroarea, încearcă scanarea Linux
return None
def scan_network_linux(self, custom_network=None):
"""Scanarea tradițională Linux pentru compatibilitate"""
if custom_network:
# Validează formatul rețelei CIDR
import ipaddress
try:
ipaddress.ip_network(custom_network, strict=False)
network = custom_network
except ValueError:
return {'success': False, 'message': f'Format rețea invalid: {custom_network}. Folosește format CIDR (ex: 192.168.1.0/24)'}
else:
# Detectează rețeaua locală folosind route (net-tools)
result = subprocess.run(['route', '-n'], capture_output=True, text=True)
network = '192.168.1.0/24' # default
for line in result.stdout.split('\n'):
if '0.0.0.0' in line and 'UG' in line: # default gateway
parts = line.split()
if len(parts) >= 2:
gateway = parts[1]
# Construiește rețeaua bazată pe gateway
network_parts = gateway.split('.')
network = f"{network_parts[0]}.{network_parts[1]}.{network_parts[2]}.0/24"
break
# Încearcă să obțină MAC addresses din tabela ARP pentru dispozitive cunoscute
arp_result = subprocess.run(['arp', '-a'], capture_output=True, text=True)
scanned = []
for line in arp_result.stdout.split('\n'):
# Regex pentru parsarea ARP
match = re.search(r'\((\d+\.\d+\.\d+\.\d+)\).*([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', line)
if match:
ip = match.group(1)
mac = match.group(2)
hostname = line.split()[0] if line.split() else '?'
# Verifică dacă IP-ul este în rețeaua specificată
if custom_network:
import ipaddress
try:
net = ipaddress.ip_network(custom_network, strict=False)
if ipaddress.ip_address(ip) in net:
scanned.append({
'ip': ip,
'mac': mac,
'hostname': hostname,
'status': self.ping_computer(ip)
})
except:
pass
else:
# Autodetectare - adaugă toate intrările ARP
scanned.append({
'ip': ip,
'mac': mac,
'hostname': hostname,
'status': self.ping_computer(ip)
})
# Dacă nu s-au găsit dispozitive și este o rețea specificată custom, returnează mesaj informativ
if custom_network and not scanned:
return {
'success': True,
'computers': [],
'message': f'Nu s-au găsit dispozitive cu MAC addresses în rețeaua {custom_network}. În Docker Desktop Windows, scanarea automată este limitată. Rulează scanul Windows sau folosește butonul " Adaugă Calculator" pentru a adăuga manual dispozitivele cu IP și MAC cunoscute.'
}
return {'success': True, 'computers': scanned}
wol_manager = WOLManager()
@app.route('/')
def index():
# Hot reload is working!
return render_template('index.html')
@app.route('/api/computers')
def get_computers():
return jsonify(wol_manager.load_computers())
@app.route('/api/wake', methods=['POST'])
def wake_computer():
data = request.get_json()
result = wol_manager.wake_computer(
data.get('mac'),
data.get('name'),
data.get('ip', '')
)
return jsonify(result)
@app.route('/api/wake-all', methods=['POST'])
def wake_all():
computers = wol_manager.load_computers()
results = []
for computer in computers:
result = wol_manager.wake_computer(
computer['mac'],
computer['name'],
computer.get('ip', '')
)
results.append({
'name': computer['name'],
'result': result
})
return jsonify({'results': results})
@app.route('/api/add', methods=['POST'])
def add_computer():
data = request.get_json()
result = wol_manager.add_computer(
data.get('name'),
data.get('mac'),
data.get('ip', '')
)
return jsonify(result)
@app.route('/api/rename', methods=['POST'])
def rename_computer():
data = request.get_json()
result = wol_manager.rename_computer(
data.get('old_name'),
data.get('new_name')
)
return jsonify(result)
@app.route('/api/delete', methods=['POST'])
def delete_computer():
data = request.get_json()
result = wol_manager.delete_computer(
name=data.get('name'),
mac=data.get('mac')
)
return jsonify(result)
@app.route('/api/scan', methods=['GET', 'POST'])
def scan_network():
network = None
if request.method == 'POST':
data = request.get_json()
network = data.get('network') if data else None
elif request.method == 'GET':
network = request.args.get('network')
result = wol_manager.scan_network(network)
return jsonify(result)
@app.route('/api/scan/windows', methods=['POST'])
def trigger_windows_scan():
"""Declanșează un scan Windows prin apelarea script-ului PowerShell"""
try:
data = request.get_json()
network = data.get('network') if data else None
# Încearcă să execute PowerShell prin WSL interop
script_path = '/scripts/windows-network-scan.ps1'
output_path = '/data/network-scan-results.json'
# Construiește comanda PowerShell prin WSL interop
cmd = ['/mnt/c/Windows/System32/WindowsPowerShell/v1.0/powershell.exe',
'-ExecutionPolicy', 'Bypass',
'-File', script_path,
'-OutputPath', output_path,
'-Verbose']
if network:
cmd.extend(['-Network', network])
# Încearcă să execute script-ul prin WSL interop
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
# Scanul a fost executat cu succes, citește rezultatele
scan_result = wol_manager.scan_network(network)
scan_result['message'] = 'Scan Windows executat automat cu succes!'
return jsonify(scan_result)
else:
# Dacă execuția automată eșuează, oferă instrucțiuni manuale
return jsonify({
'success': False,
'message': f'Execuția automată a eșuat: {result.stderr[:200]}...',
'auto_execution_failed': True,
'instructions': 'Rulează manual din Windows unul dintre comenzile:',
'commands': [
f'scripts\\scan-network.bat{" -network " + network if network else ""}',
f'scripts\\run-scan.ps1{" -Network " + network if network else ""}',
f'powershell.exe -ExecutionPolicy Bypass -File scripts\\windows-network-scan.ps1 -OutputPath data\\network-scan-results.json{" -Network " + network if network else ""}'
]
})
except (subprocess.TimeoutExpired, FileNotFoundError, PermissionError) as e:
# Execuția automată nu este posibilă, oferă instrucțiuni manuale
return jsonify({
'success': False,
'message': f'Execuția automată nu este disponibilă: {str(e)[:100]}',
'auto_execution_failed': True,
'instructions': 'Rulează manual din Windows unul dintre comenzile:',
'commands': [
f'scripts\\scan-network.bat{" -network " + network if network else ""}',
f'scripts\\run-scan.ps1{" -Network " + network if network else ""}',
f'powershell.exe -ExecutionPolicy Bypass -File scripts\\windows-network-scan.ps1 -OutputPath data\\network-scan-results.json{" -Network " + network if network else ""}'
]
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Eroare: {str(e)}',
'instructions': 'Rulează manual: scripts\\scan-network.bat'
})
if __name__ == '__main__':
# Enable debug mode for development with hot reload
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=True)