Initial commit - WOL Manager Flask application
- Added containerized Flask web application for Wake-on-LAN management - Implemented computer management with file-based configuration - Added network scanning and device discovery functionality - Included Docker setup with privileged networking for WOL operations 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
200
app/app.py
Normal file
200
app/app.py
Normal file
@@ -0,0 +1,200 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
import re
|
||||
from flask import Flask, render_template, request, jsonify
|
||||
import socket
|
||||
|
||||
app = Flask(__name__)
|
||||
CONFIG_FILE = '/data/wol-computers.conf'
|
||||
|
||||
class WOLManager:
|
||||
def __init__(self):
|
||||
self.ensure_config_exists()
|
||||
|
||||
def ensure_config_exists(self):
|
||||
os.makedirs('/data', exist_ok=True)
|
||||
if not os.path.exists(CONFIG_FILE):
|
||||
with open(CONFIG_FILE, 'w') as f:
|
||||
f.write("# Format: name|mac|ip\n")
|
||||
|
||||
def load_computers(self):
|
||||
computers = []
|
||||
if os.path.exists(CONFIG_FILE):
|
||||
with open(CONFIG_FILE, 'r') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
parts = line.split('|')
|
||||
if len(parts) >= 2:
|
||||
computer = {
|
||||
'name': parts[0],
|
||||
'mac': parts[1],
|
||||
'ip': parts[2] if len(parts) > 2 else '',
|
||||
'status': self.ping_computer(parts[2]) if len(parts) > 2 and parts[2] else 'unknown'
|
||||
}
|
||||
computers.append(computer)
|
||||
return computers
|
||||
|
||||
def ping_computer(self, ip):
|
||||
if not ip:
|
||||
return 'unknown'
|
||||
try:
|
||||
result = subprocess.run(['ping', '-c', '1', '-W', '2', ip],
|
||||
capture_output=True, timeout=5)
|
||||
return 'online' if result.returncode == 0 else 'offline'
|
||||
except:
|
||||
return 'unknown'
|
||||
|
||||
def wake_computer(self, mac, name, ip=''):
|
||||
try:
|
||||
# Trimite magic packet
|
||||
result = subprocess.run(['wakeonlan', mac], capture_output=True, text=True)
|
||||
|
||||
if result.returncode == 0:
|
||||
# Dacă avem IP, verifică dacă s-a trezit
|
||||
if ip:
|
||||
for i in range(10): # 30 secunde total
|
||||
if self.ping_computer(ip) == 'online':
|
||||
return {
|
||||
'success': True,
|
||||
'message': f'{name} s-a trezit după {i*3} secunde!',
|
||||
'status': 'online'
|
||||
}
|
||||
if i < 9: # Nu aștepta după ultima încercare
|
||||
subprocess.run(['sleep', '3'])
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'message': f'Magic packet trimis pentru {name}, dar nu răspunde la ping',
|
||||
'status': 'unknown'
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'success': True,
|
||||
'message': f'Magic packet trimis pentru {name}!',
|
||||
'status': 'unknown'
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'success': False,
|
||||
'message': f'Eroare la trimiterea magic packet: {result.stderr}',
|
||||
'status': 'error'
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'message': f'Eroare: {str(e)}',
|
||||
'status': 'error'
|
||||
}
|
||||
|
||||
def add_computer(self, name, mac, ip=''):
|
||||
# Validare MAC
|
||||
mac_pattern = r'^[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}$'
|
||||
if not re.match(mac_pattern, mac):
|
||||
return {'success': False, 'message': 'MAC address invalid!'}
|
||||
|
||||
# Adaugă în fișier
|
||||
with open(CONFIG_FILE, 'a') as f:
|
||||
f.write(f"{name}|{mac}|{ip}\n")
|
||||
|
||||
return {'success': True, 'message': f'Calculator {name} adăugat!'}
|
||||
|
||||
def scan_network(self):
|
||||
try:
|
||||
# Detectează rețeaua locală
|
||||
result = subprocess.run(['ip', 'route'], capture_output=True, text=True)
|
||||
network = '192.168.1.0/24' # default
|
||||
|
||||
for line in result.stdout.split('\n'):
|
||||
if 'default' in line:
|
||||
parts = line.split()
|
||||
if len(parts) >= 3:
|
||||
gateway = parts[2]
|
||||
# Construiește rețeaua bazată pe gateway
|
||||
network_parts = gateway.split('.')
|
||||
network = f"{network_parts[0]}.{network_parts[1]}.{network_parts[2]}.0/24"
|
||||
break
|
||||
|
||||
# Scanează rețeaua
|
||||
subprocess.run(['nmap', '-sn', network], capture_output=True, timeout=30)
|
||||
|
||||
# Citește ARP table
|
||||
result = subprocess.run(['arp', '-a'], capture_output=True, text=True)
|
||||
|
||||
scanned = []
|
||||
for line in result.stdout.split('\n'):
|
||||
# Regex pentru parsarea ARP
|
||||
match = re.search(r'\((\d+\.\d+\.\d+\.\d+)\).*([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', line)
|
||||
if match:
|
||||
ip = match.group(1)
|
||||
mac = match.group(2)
|
||||
hostname = line.split()[0] if line.split() else 'unknown'
|
||||
|
||||
scanned.append({
|
||||
'ip': ip,
|
||||
'mac': mac,
|
||||
'hostname': hostname,
|
||||
'status': self.ping_computer(ip)
|
||||
})
|
||||
|
||||
return {'success': True, 'computers': scanned}
|
||||
except Exception as e:
|
||||
return {'success': False, 'message': f'Eroare la scanare: {str(e)}'}
|
||||
|
||||
wol_manager = WOLManager()
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/api/computers')
|
||||
def get_computers():
|
||||
return jsonify(wol_manager.load_computers())
|
||||
|
||||
@app.route('/api/wake', methods=['POST'])
|
||||
def wake_computer():
|
||||
data = request.get_json()
|
||||
result = wol_manager.wake_computer(
|
||||
data.get('mac'),
|
||||
data.get('name'),
|
||||
data.get('ip', '')
|
||||
)
|
||||
return jsonify(result)
|
||||
|
||||
@app.route('/api/wake-all', methods=['POST'])
|
||||
def wake_all():
|
||||
computers = wol_manager.load_computers()
|
||||
results = []
|
||||
|
||||
for computer in computers:
|
||||
result = wol_manager.wake_computer(
|
||||
computer['mac'],
|
||||
computer['name'],
|
||||
computer.get('ip', '')
|
||||
)
|
||||
results.append({
|
||||
'name': computer['name'],
|
||||
'result': result
|
||||
})
|
||||
|
||||
return jsonify({'results': results})
|
||||
|
||||
@app.route('/api/add', methods=['POST'])
|
||||
def add_computer():
|
||||
data = request.get_json()
|
||||
result = wol_manager.add_computer(
|
||||
data.get('name'),
|
||||
data.get('mac'),
|
||||
data.get('ip', '')
|
||||
)
|
||||
return jsonify(result)
|
||||
|
||||
@app.route('/api/scan')
|
||||
def scan_network():
|
||||
result = wol_manager.scan_network()
|
||||
return jsonify(result)
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=8080, debug=False)
|
||||
Reference in New Issue
Block a user