Files
clawd/dashboard/api.py
2026-02-14 08:26:57 +00:00

2476 lines
100 KiB
Python

#!/usr/bin/env python3
"""
Simple API server for Echo Task Board.
Handles YouTube summarization requests.
"""
import json
import shutil
import subprocess
import sys
import re
import os
import signal
import uuid
from http.server import HTTPServer, SimpleHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
from datetime import datetime
from pathlib import Path
# Import habits helpers
sys.path.insert(0, str(Path(__file__).parent))
import habits_helpers
BASE_DIR = Path(__file__).parent.parent
TOOLS_DIR = BASE_DIR / 'tools'
NOTES_DIR = BASE_DIR / 'kb' / 'youtube'
KANBAN_DIR = BASE_DIR / 'dashboard'
WORKSPACE_DIR = Path('/home/moltbot/workspace')
HABITS_FILE = KANBAN_DIR / 'habits.json'
# Eco (echo-core) constants
ECO_SERVICES = ['echo-core', 'echo-whatsapp-bridge', 'echo-taskboard']
ECHO_CORE_DIR = Path('/home/moltbot/echo-core')
ECHO_LOG_FILE = ECHO_CORE_DIR / 'logs' / 'echo-core.log'
ECHO_SESSIONS_FILE = ECHO_CORE_DIR / 'sessions' / 'active.json'
# Load .env file if present
_env_file = Path(__file__).parent / '.env'
if _env_file.exists():
for line in _env_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
os.environ.setdefault(k.strip(), v.strip())
GITEA_URL = os.environ.get('GITEA_URL', 'https://gitea.romfast.ro')
GITEA_ORG = os.environ.get('GITEA_ORG', 'romfast')
GITEA_TOKEN = os.environ.get('GITEA_TOKEN', '')
class TaskBoardHandler(SimpleHTTPRequestHandler):
def do_POST(self):
if self.path == '/api/youtube':
self.handle_youtube()
elif self.path == '/api/files':
self.handle_files_post()
elif self.path == '/api/refresh-index':
self.handle_refresh_index()
elif self.path == '/api/git-commit':
self.handle_git_commit()
elif self.path == '/api/pdf':
self.handle_pdf_post()
elif self.path == '/api/habits':
self.handle_habits_post()
elif self.path.startswith('/api/habits/') and self.path.endswith('/check'):
self.handle_habits_check()
elif self.path.startswith('/api/habits/') and self.path.endswith('/skip'):
self.handle_habits_skip()
elif self.path == '/api/workspace/run':
self.handle_workspace_run()
elif self.path == '/api/workspace/stop':
self.handle_workspace_stop()
elif self.path == '/api/workspace/git/commit':
self.handle_workspace_git_commit()
elif self.path == '/api/workspace/git/push':
self.handle_workspace_git_push()
elif self.path == '/api/workspace/delete':
self.handle_workspace_delete()
elif self.path == '/api/eco/restart':
self.handle_eco_restart()
elif self.path == '/api/eco/stop':
self.handle_eco_stop()
elif self.path == '/api/eco/sessions/clear':
self.handle_eco_sessions_clear()
else:
self.send_error(404)
def do_PUT(self):
if self.path.startswith('/api/habits/'):
self.handle_habits_put()
else:
self.send_error(404)
def do_DELETE(self):
if self.path.startswith('/api/habits/') and '/check' in self.path:
self.handle_habits_uncheck()
elif self.path.startswith('/api/habits/'):
self.handle_habits_delete()
else:
self.send_error(404)
def handle_git_commit(self):
"""Run git commit and push."""
try:
script = TOOLS_DIR / 'git_commit.py'
result = subprocess.run(
[sys.executable, str(script), '--push'],
capture_output=True,
text=True,
timeout=60,
cwd=str(BASE_DIR)
)
output = result.stdout + result.stderr
# Parse files count
files_match = re.search(r'Files changed: (\d+)', output)
files = int(files_match.group(1)) if files_match else 0
if result.returncode == 0 or 'Pushing...' in output:
self.send_json({
'success': True,
'files': files,
'output': output
})
else:
self.send_json({
'success': False,
'error': output or 'Unknown error'
})
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_refresh_index(self):
"""Regenerate memory/kb/index.json"""
try:
script = TOOLS_DIR / 'update_notes_index.py'
result = subprocess.run(
[sys.executable, str(script)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
# Parse output for stats
output = result.stdout
total_match = re.search(r'with (\d+) notes', output)
total = int(total_match.group(1)) if total_match else 0
self.send_json({
'success': True,
'message': f'Index regenerat cu {total} notițe',
'total': total,
'output': output
})
else:
self.send_json({
'success': False,
'error': result.stderr or 'Unknown error'
}, 500)
except subprocess.TimeoutExpired:
self.send_json({'success': False, 'error': 'Timeout'}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_files_post(self):
"""Save file content."""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
path = data.get('path', '')
content = data.get('content', '')
# Allow access to clawd and workspace
allowed_dirs = [
Path('/home/moltbot/clawd'),
Path('/home/moltbot/workspace')
]
# Try to resolve against each allowed directory
target = None
workspace = None
for base in allowed_dirs:
try:
candidate = (base / path).resolve()
# Check if candidate is within ANY allowed directory (handles symlinks)
if any(str(candidate).startswith(str(d)) for d in allowed_dirs):
target = candidate
workspace = base
break
except:
continue
if target is None:
self.send_json({'error': 'Access denied'}, 403)
return
# Create parent dirs if needed
target.parent.mkdir(parents=True, exist_ok=True)
# Write file
target.write_text(content, encoding='utf-8')
self.send_json({
'status': 'saved',
'path': path,
'size': len(content)
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_pdf_post(self):
"""Convert markdown to PDF (text-based, not image) using venv script."""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
markdown_content = data.get('markdown', '')
filename = data.get('filename', 'document.pdf')
if not markdown_content:
self.send_json({'error': 'No markdown content'}, 400)
return
# Call PDF generator script in venv
venv_python = BASE_DIR / 'venv' / 'bin' / 'python3'
pdf_script = TOOLS_DIR / 'generate_pdf.py'
if not venv_python.exists():
self.send_json({'error': 'Venv Python not found'}, 500)
return
if not pdf_script.exists():
self.send_json({'error': 'PDF generator script not found'}, 500)
return
# Prepare input JSON
input_data = json.dumps({
'markdown': markdown_content,
'filename': filename
})
# Call script with stdin
result = subprocess.run(
[str(venv_python), str(pdf_script)],
input=input_data.encode('utf-8'),
capture_output=True,
timeout=30
)
if result.returncode != 0:
# Error from script
error_msg = result.stderr.decode('utf-8', errors='replace')
try:
error_json = json.loads(error_msg)
self.send_json(error_json, 500)
except:
self.send_json({'error': error_msg}, 500)
return
# PDF bytes from stdout
pdf_bytes = result.stdout
# Send as file download
self.send_response(200)
self.send_header('Content-Type', 'application/pdf')
self.send_header('Content-Disposition', f'attachment; filename="{filename}"')
self.send_header('Content-Length', str(len(pdf_bytes)))
self.end_headers()
self.wfile.write(pdf_bytes)
except subprocess.TimeoutExpired:
self.send_json({'error': 'PDF generation timeout'}, 500)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def do_GET(self):
if self.path == '/api/status':
self.send_json({'status': 'ok', 'time': datetime.now().isoformat()})
elif self.path == '/api/git' or self.path.startswith('/api/git?'):
self.handle_git_status()
elif self.path == '/api/agents' or self.path.startswith('/api/agents?'):
self.handle_agents_status()
elif self.path == '/api/cron' or self.path.startswith('/api/cron?'):
self.handle_cron_status()
elif self.path == '/api/activity' or self.path.startswith('/api/activity?'):
self.handle_activity()
elif self.path == '/api/habits':
self.handle_habits_get()
elif self.path.startswith('/api/files'):
self.handle_files_get()
elif self.path.startswith('/api/diff'):
self.handle_git_diff()
elif self.path == '/api/workspace' or self.path.startswith('/api/workspace?'):
self.handle_workspace_list()
elif self.path.startswith('/api/workspace/git/diff'):
self.handle_workspace_git_diff()
elif self.path.startswith('/api/workspace/logs'):
self.handle_workspace_logs()
elif self.path == '/api/eco/status' or self.path.startswith('/api/eco/status?'):
self.handle_eco_status()
elif self.path == '/api/eco/sessions' or self.path.startswith('/api/eco/sessions?'):
self.handle_eco_sessions()
elif self.path.startswith('/api/eco/sessions/content'):
self.handle_eco_session_content()
elif self.path.startswith('/api/eco/logs'):
self.handle_eco_logs()
elif self.path == '/api/eco/doctor':
self.handle_eco_doctor()
elif self.path.startswith('/api/'):
self.send_error(404)
else:
# Serve static files
super().do_GET()
def handle_git_status(self):
"""Get git status for dashboard."""
try:
workspace = Path('/home/moltbot/clawd')
# Get current branch
branch = subprocess.run(
['git', 'branch', '--show-current'],
cwd=workspace, capture_output=True, text=True, timeout=5
).stdout.strip()
# Get last commit
last_commit = subprocess.run(
['git', 'log', '-1', '--format=%h|%s|%cr'],
cwd=workspace, capture_output=True, text=True, timeout=5
).stdout.strip()
commit_parts = last_commit.split('|') if last_commit else ['', '', '']
# Get uncommitted files
status_output = subprocess.run(
['git', 'status', '--short'],
cwd=workspace, capture_output=True, text=True, timeout=5
).stdout.strip()
uncommitted = status_output.split('\n') if status_output else []
uncommitted = [f for f in uncommitted if f.strip()]
# Get diff stats if there are uncommitted files
diff_stat = ''
if uncommitted:
diff_stat = subprocess.run(
['git', 'diff', '--stat', '--cached'],
cwd=workspace, capture_output=True, text=True, timeout=5
).stdout.strip()
if not diff_stat:
diff_stat = subprocess.run(
['git', 'diff', '--stat'],
cwd=workspace, capture_output=True, text=True, timeout=5
).stdout.strip()
# Parse uncommitted into structured format
# Format: XY PATH where XY is 2 chars (index + working tree status)
# Examples: "M AGENTS.md" (staged), " M tools.md" (unstaged), "?? file" (untracked)
# The format varies: sometimes 1 space after status, sometimes 2
uncommitted_parsed = []
for line in uncommitted:
if len(line) >= 2:
status = line[:2].strip() # Get 2 chars and strip whitespace
filepath = line[2:].strip() # Get everything after position 2 and strip
if filepath: # Only add if filepath is not empty
uncommitted_parsed.append({'status': status, 'path': filepath})
self.send_json({
'branch': branch,
'lastCommit': {
'hash': commit_parts[0] if len(commit_parts) > 0 else '',
'message': commit_parts[1] if len(commit_parts) > 1 else '',
'time': commit_parts[2] if len(commit_parts) > 2 else ''
},
'uncommitted': uncommitted,
'uncommittedParsed': uncommitted_parsed,
'uncommittedCount': len(uncommitted),
'diffStat': diff_stat,
'clean': len(uncommitted) == 0
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_git_diff(self):
"""Get git diff for a specific file."""
from urllib.parse import urlparse, parse_qs
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
filepath = params.get('path', [''])[0]
if not filepath:
self.send_json({'error': 'path required'}, 400)
return
try:
workspace = Path('/home/moltbot/clawd')
# Security check
target = (workspace / filepath).resolve()
if not str(target).startswith(str(workspace)):
self.send_json({'error': 'Access denied'}, 403)
return
# Get diff (try staged first, then unstaged)
diff = subprocess.run(
['git', 'diff', '--cached', '--', filepath],
cwd=workspace, capture_output=True, text=True, timeout=10
).stdout
if not diff:
diff = subprocess.run(
['git', 'diff', '--', filepath],
cwd=workspace, capture_output=True, text=True, timeout=10
).stdout
# If still no diff, file might be untracked - show full content
if not diff:
status = subprocess.run(
['git', 'status', '--short', '--', filepath],
cwd=workspace, capture_output=True, text=True, timeout=5
).stdout.strip()
if status.startswith('??'):
# Untracked file - show as new
if target.exists():
content = target.read_text(encoding='utf-8', errors='replace')[:50000]
diff = f"+++ b/{filepath}\n" + '\n'.join(f'+{line}' for line in content.split('\n'))
self.send_json({
'path': filepath,
'diff': diff or 'No changes',
'hasDiff': bool(diff)
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_agents_status(self):
"""Get agents status - fast version reading session files directly."""
try:
# Define known agents
agents_config = [
{'id': 'echo', 'name': 'Echo', 'emoji': '🌀'},
{'id': 'echo-work', 'name': 'Work', 'emoji': ''},
{'id': 'echo-health', 'name': 'Health', 'emoji': '❤️'},
{'id': 'echo-growth', 'name': 'Growth', 'emoji': '🪜'},
{'id': 'echo-sprijin', 'name': 'Sprijin', 'emoji': ''},
{'id': 'echo-scout', 'name': 'Scout', 'emoji': '⚜️'},
]
# Check active sessions by reading session files directly (fast)
active_agents = set()
sessions_base = Path.home() / '.clawdbot' / 'agents'
if sessions_base.exists():
for agent_dir in sessions_base.iterdir():
if agent_dir.is_dir():
sessions_file = agent_dir / 'sessions' / 'sessions.json'
if sessions_file.exists():
try:
data = json.loads(sessions_file.read_text())
# sessions.json is an object with session keys
now = datetime.now().timestamp() * 1000
for key, sess in data.items():
if isinstance(sess, dict):
last_active = sess.get('updatedAt', 0)
if now - last_active < 30 * 60 * 1000: # 30 min
active_agents.add(agent_dir.name)
break
except:
pass
# Build response
agents = []
for cfg in agents_config:
agents.append({
'id': cfg['id'],
'name': cfg['name'],
'emoji': cfg['emoji'],
'active': cfg['id'] in active_agents
})
self.send_json({'agents': agents})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_cron_status(self):
"""Get cron jobs status from ~/.clawdbot/cron/jobs.json"""
try:
jobs_file = Path.home() / '.clawdbot' / 'cron' / 'jobs.json'
if not jobs_file.exists():
self.send_json({'jobs': [], 'error': 'No jobs file found'})
return
data = json.loads(jobs_file.read_text())
all_jobs = data.get('jobs', [])
# Filter enabled jobs and format for dashboard
now_ms = datetime.now().timestamp() * 1000
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_start_ms = today_start.timestamp() * 1000
jobs = []
for job in all_jobs:
if not job.get('enabled', False):
continue
# Parse cron expression to get time
schedule = job.get('schedule', {})
expr = schedule.get('expr', '')
# Simple cron parsing for display - convert UTC to Bucharest
parts = expr.split()
if len(parts) >= 2:
minute = parts[0]
hour = parts[1]
if minute.isdigit() and (hour.isdigit() or '-' in hour):
# Handle hour ranges like "7-17"
if '-' in hour:
hour_start, hour_end = hour.split('-')
hour = hour_start # Show first hour
# Convert UTC to Bucharest (UTC+2 winter, UTC+3 summer)
from datetime import timezone as dt_timezone
from zoneinfo import ZoneInfo
try:
bucharest = ZoneInfo('Europe/Bucharest')
utc_hour = int(hour)
utc_minute = int(minute)
# Create UTC datetime for today
utc_dt = datetime.now(dt_timezone.utc).replace(hour=utc_hour, minute=utc_minute, second=0, microsecond=0)
local_dt = utc_dt.astimezone(bucharest)
time_str = f"{local_dt.hour:02d}:{local_dt.minute:02d}"
except:
time_str = f"{int(hour):02d}:{int(minute):02d}"
else:
time_str = expr[:15]
else:
time_str = expr[:15]
# Check if ran today
state = job.get('state', {})
last_run = state.get('lastRunAtMs', 0)
ran_today = last_run >= today_start_ms
last_status = state.get('lastStatus', 'unknown')
jobs.append({
'id': job.get('id'),
'name': job.get('name'),
'agentId': job.get('agentId'),
'time': time_str,
'schedule': expr,
'ranToday': ran_today,
'lastStatus': last_status if ran_today else None,
'lastRunAtMs': last_run,
'nextRunAtMs': state.get('nextRunAtMs')
})
# Sort by time
jobs.sort(key=lambda j: j['time'])
self.send_json({
'jobs': jobs,
'total': len(jobs),
'ranToday': sum(1 for j in jobs if j['ranToday'])
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_activity(self):
"""Aggregate activity from multiple sources: cron jobs, git commits, file changes."""
from datetime import timezone as dt_timezone
from zoneinfo import ZoneInfo
try:
activities = []
bucharest = ZoneInfo('Europe/Bucharest')
workspace = Path('/home/moltbot/clawd')
# 1. Cron jobs ran today
try:
result = subprocess.run(
['clawdbot', 'cron', 'list', '--json'],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
cron_data = json.loads(result.stdout)
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_start_ms = today_start.timestamp() * 1000
for job in cron_data.get('jobs', []):
state = job.get('state', {})
last_run = state.get('lastRunAtMs', 0)
if last_run >= today_start_ms:
run_time = datetime.fromtimestamp(last_run / 1000, tz=dt_timezone.utc)
local_time = run_time.astimezone(bucharest)
activities.append({
'type': 'cron',
'icon': 'clock',
'text': f"Job: {job.get('name', 'unknown')}",
'agent': job.get('agentId', 'echo'),
'time': local_time.strftime('%H:%M'),
'timestamp': last_run,
'status': state.get('lastStatus', 'ok')
})
except:
pass
# 2. Git commits (last 24h)
try:
result = subprocess.run(
['git', 'log', '--oneline', '--since=24 hours ago', '--format=%H|%s|%at'],
cwd=workspace, capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if '|' in line:
parts = line.split('|')
if len(parts) >= 3:
commit_hash, message, timestamp = parts[0], parts[1], int(parts[2])
commit_time = datetime.fromtimestamp(timestamp, tz=dt_timezone.utc)
local_time = commit_time.astimezone(bucharest)
activities.append({
'type': 'git',
'icon': 'git-commit',
'text': message[:60] + ('...' if len(message) > 60 else ''),
'agent': 'git',
'time': local_time.strftime('%H:%M'),
'timestamp': timestamp * 1000,
'commitHash': commit_hash[:8]
})
except:
pass
# 2b. Git uncommitted files
try:
result = subprocess.run(
['git', 'status', '--short'],
cwd=workspace, capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
if len(line) >= 4:
# Git status format: XY filename (XY = 2 chars status)
# Handle both "M " and " M" formats
status = line[:2]
# Find filepath - skip status chars and any spaces
filepath = line[2:].lstrip()
if not filepath:
continue
status_clean = status.strip()
status_labels = {'M': 'modificat', 'A': 'adăugat', 'D': 'șters', '??': 'nou', 'R': 'redenumit'}
status_label = status_labels.get(status_clean, status_clean)
activities.append({
'type': 'git-file',
'icon': 'file-diff',
'text': f"{filepath}",
'agent': f"git ({status_label})",
'time': 'acum',
'timestamp': int(datetime.now().timestamp() * 1000),
'path': filepath,
'gitStatus': status_clean
})
except:
pass
# 3. Recent files in memory/kb/ (last 24h)
try:
kb_dir = workspace / 'kb'
cutoff = datetime.now().timestamp() - (24 * 3600)
for md_file in kb_dir.rglob('*.md'):
stat = md_file.stat()
if stat.st_mtime > cutoff:
file_time = datetime.fromtimestamp(stat.st_mtime, tz=dt_timezone.utc)
local_time = file_time.astimezone(bucharest)
rel_path = md_file.relative_to(workspace)
activities.append({
'type': 'file',
'icon': 'file-text',
'text': f"Fișier: {md_file.name}",
'agent': str(rel_path.parent),
'time': local_time.strftime('%H:%M'),
'timestamp': int(stat.st_mtime * 1000),
'path': str(rel_path)
})
except:
pass
# 4. Tasks from tasks.json
try:
tasks_file = workspace / 'dashboard' / 'tasks.json'
if tasks_file.exists():
tasks_data = json.loads(tasks_file.read_text())
for col in tasks_data.get('columns', []):
for task in col.get('tasks', []):
ts_str = task.get('completed') or task.get('created', '')
if ts_str:
try:
ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
if ts.timestamp() > (datetime.now().timestamp() - 7 * 24 * 3600):
local_time = ts.astimezone(bucharest)
activities.append({
'type': 'task',
'icon': 'check-circle' if task.get('completed') else 'circle',
'text': task.get('title', ''),
'agent': task.get('agent', 'Echo'),
'time': local_time.strftime('%d %b %H:%M'),
'timestamp': int(ts.timestamp() * 1000),
'status': 'done' if task.get('completed') else col['id']
})
except:
pass
except:
pass
# Sort by timestamp descending
activities.sort(key=lambda x: x.get('timestamp', 0), reverse=True)
# Limit to 30 items
activities = activities[:30]
self.send_json({
'activities': activities,
'total': len(activities)
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_files_get(self):
"""List files or get file content."""
from urllib.parse import urlparse, parse_qs
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
path = params.get('path', [''])[0]
action = params.get('action', ['list'])[0]
# Security: only allow access within allowed directories
allowed_dirs = [
Path('/home/moltbot/clawd'),
Path('/home/moltbot/workspace')
]
# Try to resolve against each allowed directory
target = None
workspace = None
for base in allowed_dirs:
try:
candidate = (base / path).resolve()
# Check if candidate is within ANY allowed directory (handles symlinks)
if any(str(candidate).startswith(str(d)) for d in allowed_dirs):
target = candidate
workspace = base
break
except:
continue
if target is None:
self.send_json({'error': 'Access denied'}, 403)
return
if action == 'list':
if not target.exists():
self.send_json({'error': 'Path not found'}, 404)
return
if target.is_file():
# Return file content
try:
content = target.read_text(encoding='utf-8', errors='replace')
self.send_json({
'type': 'file',
'path': path,
'name': target.name,
'content': content[:100000], # Limit to 100KB
'size': target.stat().st_size,
'truncated': target.stat().st_size > 100000
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
else:
# List directory
items = []
try:
for item in sorted(target.iterdir()):
stat = item.stat()
# Build relative path from original request path
item_path = f"{path}/{item.name}" if path else item.name
items.append({
'name': item.name,
'type': 'dir' if item.is_dir() else 'file',
'size': stat.st_size if item.is_file() else None,
'mtime': stat.st_mtime,
'path': item_path
})
self.send_json({
'type': 'dir',
'path': path,
'items': items
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
else:
self.send_json({'error': 'Unknown action'}, 400)
def handle_workspace_list(self):
"""List projects in ~/workspace/ with Ralph status, git info, etc."""
try:
projects = []
if not WORKSPACE_DIR.exists():
self.send_json({'projects': []})
return
for project_dir in sorted(WORKSPACE_DIR.iterdir()):
if not project_dir.is_dir() or project_dir.name.startswith('.'):
continue
ralph_dir = project_dir / 'scripts' / 'ralph'
prd_json = ralph_dir / 'prd.json'
tasks_dir = project_dir / 'tasks'
proj = {
'name': project_dir.name,
'path': str(project_dir),
'hasRalph': ralph_dir.exists(),
'hasPrd': any(tasks_dir.glob('prd-*.md')) if tasks_dir.exists() else False,
'hasMain': (project_dir / 'main.py').exists(),
'hasVenv': (project_dir / 'venv').exists(),
'hasReadme': (project_dir / 'README.md').exists(),
'ralph': None,
'process': {'running': False, 'pid': None, 'port': None},
'git': None
}
# Ralph status
if prd_json.exists():
try:
prd = json.loads(prd_json.read_text())
stories = prd.get('userStories', [])
complete = sum(1 for s in stories if s.get('passes'))
# Check ralph PID
ralph_pid = None
ralph_running = False
pid_file = ralph_dir / '.ralph.pid'
if pid_file.exists():
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # Check if alive
ralph_running = True
ralph_pid = pid
except (ValueError, ProcessLookupError, PermissionError):
pass
# Last iteration time from logs
last_iter = None
logs_dir = ralph_dir / 'logs'
if logs_dir.exists():
log_files = sorted(logs_dir.glob('iteration-*.log'), key=lambda f: f.stat().st_mtime, reverse=True)
if log_files:
mtime = log_files[0].stat().st_mtime
last_iter = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M')
tech = prd.get('techStack', {})
proj['ralph'] = {
'running': ralph_running,
'pid': ralph_pid,
'storiesTotal': len(stories),
'storiesComplete': complete,
'lastIteration': last_iter,
'stories': [
{'id': s.get('id', ''), 'title': s.get('title', ''), 'passes': s.get('passes', False)}
for s in stories
]
}
proj['techStack'] = {
'type': tech.get('type', ''),
'commands': tech.get('commands', {}),
'port': tech.get('port'),
}
except (json.JSONDecodeError, IOError):
pass
# Check if main.py is running
if proj['hasMain']:
try:
result = subprocess.run(
['pgrep', '-f', f'python.*{project_dir.name}/main.py'],
capture_output=True, text=True, timeout=3
)
if result.stdout.strip():
pids = result.stdout.strip().split('\n')
port = None
if prd_json.exists():
try:
prd_data = json.loads(prd_json.read_text())
port = prd_data.get('techStack', {}).get('port')
except (json.JSONDecodeError, IOError):
pass
proj['process'] = {
'running': True,
'pid': int(pids[0]),
'port': port
}
except Exception:
pass
# Git info
if (project_dir / '.git').exists():
try:
branch = subprocess.run(
['git', 'branch', '--show-current'],
cwd=project_dir, capture_output=True, text=True, timeout=5
).stdout.strip()
last_commit = subprocess.run(
['git', 'log', '-1', '--format=%h - %s'],
cwd=project_dir, capture_output=True, text=True, timeout=5
).stdout.strip()
status_out = subprocess.run(
['git', 'status', '--short'],
cwd=project_dir, capture_output=True, text=True, timeout=5
).stdout.strip()
uncommitted = len([l for l in status_out.split('\n') if l.strip()]) if status_out else 0
proj['git'] = {
'branch': branch,
'lastCommit': last_commit,
'uncommitted': uncommitted
}
except Exception:
pass
projects.append(proj)
self.send_json({'projects': projects})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def _read_post_json(self):
"""Helper to read JSON POST body."""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
return json.loads(post_data)
def _validate_project(self, name):
"""Validate project name and return its path, or None."""
if not name or '/' in name or '..' in name:
return None
project_dir = WORKSPACE_DIR / name
if not project_dir.exists() or not project_dir.is_dir():
return None
# Ensure it resolves within workspace
if not str(project_dir.resolve()).startswith(str(WORKSPACE_DIR)):
return None
return project_dir
def handle_workspace_run(self):
"""Start a project process (main.py, ralph.sh, or pytest)."""
try:
data = self._read_post_json()
project_name = data.get('project', '')
command = data.get('command', '')
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'success': False, 'error': 'Invalid project'}, 400)
return
allowed_commands = {'main', 'ralph', 'test'}
if command not in allowed_commands:
self.send_json({'success': False, 'error': f'Invalid command. Allowed: {", ".join(allowed_commands)}'}, 400)
return
ralph_dir = project_dir / 'scripts' / 'ralph'
if command == 'main':
main_py = project_dir / 'main.py'
if not main_py.exists():
self.send_json({'success': False, 'error': 'No main.py found'}, 404)
return
# Use venv python if available
venv_python = project_dir / 'venv' / 'bin' / 'python'
python_cmd = str(venv_python) if venv_python.exists() else sys.executable
log_path = ralph_dir / 'logs' / 'main.log' if ralph_dir.exists() else project_dir / 'main.log'
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, 'a') as log_file:
proc = subprocess.Popen(
[python_cmd, 'main.py'],
cwd=str(project_dir),
stdout=log_file,
stderr=log_file,
start_new_session=True
)
self.send_json({'success': True, 'pid': proc.pid, 'log': str(log_path)})
elif command == 'ralph':
ralph_sh = ralph_dir / 'ralph.sh'
if not ralph_sh.exists():
self.send_json({'success': False, 'error': 'No ralph.sh found'}, 404)
return
log_path = ralph_dir / 'logs' / 'ralph.log'
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, 'a') as log_file:
proc = subprocess.Popen(
['bash', str(ralph_sh)],
cwd=str(project_dir),
stdout=log_file,
stderr=log_file,
start_new_session=True
)
# Write PID
pid_file = ralph_dir / '.ralph.pid'
pid_file.write_text(str(proc.pid))
self.send_json({'success': True, 'pid': proc.pid, 'log': str(log_path)})
elif command == 'test':
# Run pytest synchronously (with timeout)
venv_python = project_dir / 'venv' / 'bin' / 'python'
python_cmd = str(venv_python) if venv_python.exists() else sys.executable
result = subprocess.run(
[python_cmd, '-m', 'pytest', '-v', '--tb=short'],
cwd=str(project_dir),
capture_output=True, text=True,
timeout=120
)
self.send_json({
'success': result.returncode == 0,
'output': result.stdout + result.stderr,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
self.send_json({'success': False, 'error': 'Test timeout (120s)'}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_workspace_stop(self):
"""Stop a project process."""
try:
data = self._read_post_json()
project_name = data.get('project', '')
target = data.get('target', '')
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'success': False, 'error': 'Invalid project'}, 400)
return
if target not in ('main', 'ralph'):
self.send_json({'success': False, 'error': 'Invalid target. Use: main, ralph'}, 400)
return
if target == 'ralph':
pid_file = project_dir / 'scripts' / 'ralph' / '.ralph.pid'
if pid_file.exists():
try:
pid = int(pid_file.read_text().strip())
# Verify the process belongs to our user and is within workspace
proc_cwd = Path(f'/proc/{pid}/cwd').resolve()
if str(proc_cwd).startswith(str(WORKSPACE_DIR)):
os.killpg(os.getpgid(pid), signal.SIGTERM)
self.send_json({'success': True, 'message': f'Ralph stopped (PID {pid})'})
else:
self.send_json({'success': False, 'error': 'Process not in workspace'}, 403)
except ProcessLookupError:
self.send_json({'success': True, 'message': 'Process already stopped'})
except PermissionError:
self.send_json({'success': False, 'error': 'Permission denied'}, 403)
else:
self.send_json({'success': False, 'error': 'No PID file found'}, 404)
elif target == 'main':
# Find main.py process for this project
try:
result = subprocess.run(
['pgrep', '-f', f'python.*{project_dir.name}/main.py'],
capture_output=True, text=True, timeout=3
)
if result.stdout.strip():
pid = int(result.stdout.strip().split('\n')[0])
proc_cwd = Path(f'/proc/{pid}/cwd').resolve()
if str(proc_cwd).startswith(str(WORKSPACE_DIR)):
os.kill(pid, signal.SIGTERM)
self.send_json({'success': True, 'message': f'Main stopped (PID {pid})'})
else:
self.send_json({'success': False, 'error': 'Process not in workspace'}, 403)
else:
self.send_json({'success': True, 'message': 'No running process found'})
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_workspace_git_diff(self):
"""Get git diff for a workspace project."""
try:
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
project_name = params.get('project', [''])[0]
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'error': 'Invalid project'}, 400)
return
if not (project_dir / '.git').exists():
self.send_json({'error': 'Not a git repository'}, 400)
return
status = subprocess.run(
['git', 'status', '--short'],
cwd=str(project_dir), capture_output=True, text=True, timeout=10
).stdout.strip()
diff = subprocess.run(
['git', 'diff'],
cwd=str(project_dir), capture_output=True, text=True, timeout=10
).stdout
diff_cached = subprocess.run(
['git', 'diff', '--cached'],
cwd=str(project_dir), capture_output=True, text=True, timeout=10
).stdout
combined_diff = ''
if diff_cached:
combined_diff += '=== Staged Changes ===\n' + diff_cached
if diff:
if combined_diff:
combined_diff += '\n'
combined_diff += '=== Unstaged Changes ===\n' + diff
self.send_json({
'project': project_name,
'status': status,
'diff': combined_diff,
'hasDiff': bool(status)
})
except subprocess.TimeoutExpired:
self.send_json({'error': 'Timeout'}, 500)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_workspace_git_commit(self):
"""Commit all changes in a workspace project."""
try:
data = self._read_post_json()
project_name = data.get('project', '')
message = data.get('message', '').strip()
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'success': False, 'error': 'Invalid project'}, 400)
return
if not (project_dir / '.git').exists():
self.send_json({'success': False, 'error': 'Not a git repository'}, 400)
return
# Check if there's anything to commit
porcelain = subprocess.run(
['git', 'status', '--porcelain'],
cwd=str(project_dir), capture_output=True, text=True, timeout=10
).stdout.strip()
if not porcelain:
self.send_json({'success': False, 'error': 'Nothing to commit'})
return
files_changed = len([l for l in porcelain.split('\n') if l.strip()])
# Auto-message if empty
if not message:
now = datetime.now().strftime('%Y-%m-%d %H:%M')
message = f'Update: {now} ({files_changed} files)'
# Stage all and commit
subprocess.run(
['git', 'add', '-A'],
cwd=str(project_dir), capture_output=True, text=True, timeout=10
)
result = subprocess.run(
['git', 'commit', '-m', message],
cwd=str(project_dir), capture_output=True, text=True, timeout=30
)
output = result.stdout + result.stderr
if result.returncode == 0:
self.send_json({
'success': True,
'message': message,
'output': output,
'filesChanged': files_changed
})
else:
self.send_json({'success': False, 'error': output or 'Commit failed'})
except subprocess.TimeoutExpired:
self.send_json({'success': False, 'error': 'Timeout'}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def _ensure_gitea_remote(self, project_dir, project_name):
"""Create Gitea repo and add remote if no origin exists. Returns (ok, message)."""
import urllib.request
if not GITEA_TOKEN:
return False, 'GITEA_TOKEN not set'
# Create repo via Gitea API
api_url = f'{GITEA_URL}/api/v1/orgs/{GITEA_ORG}/repos'
payload = json.dumps({'name': project_name, 'private': True, 'auto_init': False}).encode()
req = urllib.request.Request(api_url, data=payload, method='POST', headers={
'Authorization': f'token {GITEA_TOKEN}',
'Content-Type': 'application/json'
})
try:
resp = urllib.request.urlopen(req, timeout=15)
resp.read()
except urllib.error.HTTPError as e:
body = e.read().decode(errors='replace')
if e.code == 409:
pass # repo already exists, fine
else:
return False, f'Gitea API error {e.code}: {body}'
# Add remote with token auth
remote_url = f'{GITEA_URL}/{GITEA_ORG}/{project_name}.git'
# Insert token into URL for push auth
auth_url = remote_url.replace('https://', f'https://gitea:{GITEA_TOKEN}@')
subprocess.run(
['git', 'remote', 'add', 'origin', auth_url],
cwd=str(project_dir), capture_output=True, text=True, timeout=5
)
return True, f'Created repo {GITEA_ORG}/{project_name}'
def handle_workspace_git_push(self):
"""Push a workspace project to its remote, creating Gitea repo if needed."""
try:
data = self._read_post_json()
project_name = data.get('project', '')
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'success': False, 'error': 'Invalid project'}, 400)
return
if not (project_dir / '.git').exists():
self.send_json({'success': False, 'error': 'Not a git repository'}, 400)
return
created_msg = ''
# Check remote exists, create if not
remote_check = subprocess.run(
['git', 'remote', 'get-url', 'origin'],
cwd=str(project_dir), capture_output=True, text=True, timeout=10
)
if remote_check.returncode != 0:
ok, msg = self._ensure_gitea_remote(project_dir, project_name)
if not ok:
self.send_json({'success': False, 'error': msg})
return
created_msg = msg + '\n'
# Push (set upstream on first push)
result = subprocess.run(
['git', 'push', '-u', 'origin', 'HEAD'],
cwd=str(project_dir), capture_output=True, text=True, timeout=60
)
output = result.stdout + result.stderr
if result.returncode == 0:
self.send_json({'success': True, 'output': created_msg + (output or 'Pushed successfully')})
else:
self.send_json({'success': False, 'error': output or 'Push failed'})
except subprocess.TimeoutExpired:
self.send_json({'success': False, 'error': 'Push timeout (60s)'}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_workspace_delete(self):
"""Delete a workspace project."""
try:
data = self._read_post_json()
project_name = data.get('project', '')
confirm = data.get('confirm', '')
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'success': False, 'error': 'Invalid project'}, 400)
return
if confirm != project_name:
self.send_json({'success': False, 'error': 'Confirmation does not match project name'}, 400)
return
# Check for running processes
try:
result = subprocess.run(
['pgrep', '-f', f'{project_dir.name}/(main\\.py|ralph)'],
capture_output=True, text=True, timeout=5
)
if result.stdout.strip():
self.send_json({'success': False, 'error': 'Project has running processes. Stop them first.'})
return
except subprocess.TimeoutExpired:
pass
shutil.rmtree(str(project_dir))
self.send_json({
'success': True,
'message': f'Project {project_name} deleted'
})
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_workspace_logs(self):
"""Get last N lines from a project log."""
try:
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
project_name = params.get('project', [''])[0]
log_type = params.get('type', ['ralph'])[0]
lines_count = min(int(params.get('lines', ['100'])[0]), 500)
project_dir = self._validate_project(project_name)
if not project_dir:
self.send_json({'error': 'Invalid project'}, 400)
return
ralph_dir = project_dir / 'scripts' / 'ralph'
# Determine log file
if log_type == 'ralph':
log_file = ralph_dir / 'logs' / 'ralph.log'
if not log_file.exists():
# Try ralph-test.log
log_file = ralph_dir / 'logs' / 'ralph-test.log'
elif log_type == 'main':
log_file = ralph_dir / 'logs' / 'main.log' if ralph_dir.exists() else project_dir / 'main.log'
elif log_type == 'progress':
log_file = ralph_dir / 'progress.txt'
else:
# Try iteration log
if log_type.startswith('iteration-'):
log_file = ralph_dir / 'logs' / f'{log_type}.log'
else:
self.send_json({'error': 'Invalid log type'}, 400)
return
if not log_file.exists():
self.send_json({
'project': project_name,
'type': log_type,
'lines': [],
'total': 0
})
return
# Security: ensure path is within workspace
if not str(log_file.resolve()).startswith(str(WORKSPACE_DIR)):
self.send_json({'error': 'Access denied'}, 403)
return
content = log_file.read_text(encoding='utf-8', errors='replace')
all_lines = content.split('\n')
total = len(all_lines)
last_lines = all_lines[-lines_count:] if len(all_lines) > lines_count else all_lines
self.send_json({
'project': project_name,
'type': log_type,
'lines': last_lines,
'total': total
})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_youtube(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
url = data.get('url', '').strip()
if not url or 'youtube.com' not in url and 'youtu.be' not in url:
self.send_json({'error': 'URL YouTube invalid'}, 400)
return
# Process synchronously (simpler, avoids fork issues)
try:
print(f"Processing YouTube URL: {url}")
result = process_youtube(url)
print(f"Processing result: {result}")
self.send_json({
'status': 'done',
'message': 'Notița a fost creată! Refresh pagina Notes.'
})
except Exception as e:
import traceback
print(f"YouTube processing error: {e}")
traceback.print_exc()
self.send_json({
'status': 'error',
'message': f'Eroare: {str(e)}'
}, 500)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_get(self):
"""Get all habits with enriched stats."""
try:
# Read habits file
if not HABITS_FILE.exists():
self.send_json([])
return
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
habits = data.get('habits', [])
# Enrich each habit with calculated stats
enriched_habits = []
for habit in habits:
# Calculate stats using helpers
current_streak = habits_helpers.calculate_streak(habit)
best_streak = habit.get('streak', {}).get('best', 0)
completion_rate = habits_helpers.get_completion_rate(habit, days=30)
weekly_summary = habits_helpers.get_weekly_summary(habit)
# Add stats to habit
enriched = habit.copy()
enriched['current_streak'] = current_streak
enriched['best_streak'] = best_streak
enriched['completion_rate_30d'] = completion_rate
enriched['weekly_summary'] = weekly_summary
enriched['should_check_today'] = habits_helpers.should_check_today(habit)
enriched_habits.append(enriched)
# Sort by priority ascending (lower number = higher priority)
enriched_habits.sort(key=lambda h: h.get('priority', 999))
self.send_json(enriched_habits)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_post(self):
"""Create a new habit."""
try:
# Read request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
# Validate required fields
name = data.get('name', '').strip()
if not name:
self.send_json({'error': 'name is required'}, 400)
return
if len(name) > 100:
self.send_json({'error': 'name must be max 100 characters'}, 400)
return
# Validate color (hex format)
color = data.get('color', '#3b82f6')
if color and not re.match(r'^#[0-9A-Fa-f]{6}$', color):
self.send_json({'error': 'color must be valid hex format (#RRGGBB)'}, 400)
return
# Validate frequency type
frequency_type = data.get('frequency', {}).get('type', 'daily')
valid_types = ['daily', 'specific_days', 'x_per_week', 'weekly', 'monthly', 'custom']
if frequency_type not in valid_types:
self.send_json({'error': f'frequency.type must be one of: {", ".join(valid_types)}'}, 400)
return
# Create new habit
habit_id = str(uuid.uuid4())
now = datetime.now().isoformat()
new_habit = {
'id': habit_id,
'name': name,
'category': data.get('category', 'other'),
'color': color,
'icon': data.get('icon', 'check-circle'),
'priority': data.get('priority', 5),
'notes': data.get('notes', ''),
'reminderTime': data.get('reminderTime', ''),
'frequency': data.get('frequency', {'type': 'daily'}),
'streak': {
'current': 0,
'best': 0,
'lastCheckIn': None
},
'lives': 3,
'completions': [],
'createdAt': now,
'updatedAt': now
}
# Read existing habits
if HABITS_FILE.exists():
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
habits_data = json.load(f)
else:
habits_data = {'lastUpdated': '', 'habits': []}
# Add new habit
habits_data['habits'].append(new_habit)
habits_data['lastUpdated'] = now
# Save to file
with open(HABITS_FILE, 'w', encoding='utf-8') as f:
json.dump(habits_data, f, indent=2)
# Return created habit with 201 status
self.send_json(new_habit, 201)
except json.JSONDecodeError:
self.send_json({'error': 'Invalid JSON'}, 400)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_put(self):
"""Update an existing habit."""
try:
# Extract habit ID from path
path_parts = self.path.split('/')
if len(path_parts) < 4:
self.send_json({'error': 'Invalid path'}, 400)
return
habit_id = path_parts[3]
# Read request body
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
# Read existing habits
if not HABITS_FILE.exists():
self.send_json({'error': 'Habit not found'}, 404)
return
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
habits_data = json.load(f)
# Find habit to update
habits = habits_data.get('habits', [])
habit_index = None
for i, habit in enumerate(habits):
if habit['id'] == habit_id:
habit_index = i
break
if habit_index is None:
self.send_json({'error': 'Habit not found'}, 404)
return
# Validate allowed fields
allowed_fields = ['name', 'category', 'color', 'icon', 'priority', 'notes', 'frequency', 'reminderTime']
# Validate name if provided
if 'name' in data:
name = data['name'].strip()
if not name:
self.send_json({'error': 'name cannot be empty'}, 400)
return
if len(name) > 100:
self.send_json({'error': 'name must be max 100 characters'}, 400)
return
# Validate color if provided
if 'color' in data:
color = data['color']
if color and not re.match(r'^#[0-9A-Fa-f]{6}$', color):
self.send_json({'error': 'color must be valid hex format (#RRGGBB)'}, 400)
return
# Validate frequency type if provided
if 'frequency' in data:
frequency_type = data.get('frequency', {}).get('type', 'daily')
valid_types = ['daily', 'specific_days', 'x_per_week', 'weekly', 'monthly', 'custom']
if frequency_type not in valid_types:
self.send_json({'error': f'frequency.type must be one of: {", ".join(valid_types)}'}, 400)
return
# Update only allowed fields
habit = habits[habit_index]
for field in allowed_fields:
if field in data:
habit[field] = data[field]
# Update timestamp
habit['updatedAt'] = datetime.now().isoformat()
# Save to file
habits_data['lastUpdated'] = datetime.now().isoformat()
with open(HABITS_FILE, 'w', encoding='utf-8') as f:
json.dump(habits_data, f, indent=2)
# Return updated habit
self.send_json(habit)
except json.JSONDecodeError:
self.send_json({'error': 'Invalid JSON'}, 400)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_delete(self):
"""Delete a habit."""
try:
# Extract habit ID from path
path_parts = self.path.split('/')
if len(path_parts) < 4:
self.send_json({'error': 'Invalid path'}, 400)
return
habit_id = path_parts[3]
# Read existing habits
if not HABITS_FILE.exists():
self.send_json({'error': 'Habit not found'}, 404)
return
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
habits_data = json.load(f)
# Find and remove habit
habits = habits_data.get('habits', [])
habit_found = False
for i, habit in enumerate(habits):
if habit['id'] == habit_id:
habits.pop(i)
habit_found = True
break
if not habit_found:
self.send_json({'error': 'Habit not found'}, 404)
return
# Save to file
habits_data['lastUpdated'] = datetime.now().isoformat()
with open(HABITS_FILE, 'w', encoding='utf-8') as f:
json.dump(habits_data, f, indent=2)
# Return 204 No Content
self.send_response(204)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_check(self):
"""Check in on a habit (complete it for today)."""
try:
# Extract habit ID from path (/api/habits/{id}/check)
path_parts = self.path.split('/')
if len(path_parts) < 5:
self.send_json({'error': 'Invalid path'}, 400)
return
habit_id = path_parts[3]
# Read optional body (note, rating, mood)
body_data = {}
content_length = self.headers.get('Content-Length')
if content_length:
post_data = self.rfile.read(int(content_length)).decode('utf-8')
if post_data.strip():
try:
body_data = json.loads(post_data)
except json.JSONDecodeError:
self.send_json({'error': 'Invalid JSON'}, 400)
return
# Read existing habits
if not HABITS_FILE.exists():
self.send_json({'error': 'Habit not found'}, 404)
return
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
habits_data = json.load(f)
# Find habit
habit = None
for h in habits_data.get('habits', []):
if h['id'] == habit_id:
habit = h
break
if not habit:
self.send_json({'error': 'Habit not found'}, 404)
return
# Verify habit is relevant for today
if not habits_helpers.should_check_today(habit):
self.send_json({'error': 'Habit is not relevant for today based on its frequency'}, 400)
return
# Verify not already checked today
today = datetime.now().date().isoformat()
completions = habit.get('completions', [])
for completion in completions:
if completion.get('date') == today:
self.send_json({'error': 'Habit already checked in today'}, 409)
return
# Create completion entry
completion_entry = {
'date': today,
'type': 'check' # Distinguish from 'skip' for life restore logic
}
# Add optional fields
if 'note' in body_data:
completion_entry['note'] = body_data['note']
if 'rating' in body_data:
rating = body_data['rating']
if not isinstance(rating, int) or rating < 1 or rating > 5:
self.send_json({'error': 'rating must be an integer between 1 and 5'}, 400)
return
completion_entry['rating'] = rating
if 'mood' in body_data:
mood = body_data['mood']
if mood not in ['happy', 'neutral', 'sad']:
self.send_json({'error': 'mood must be one of: happy, neutral, sad'}, 400)
return
completion_entry['mood'] = mood
# Add completion to habit
habit['completions'].append(completion_entry)
# Recalculate streak
current_streak = habits_helpers.calculate_streak(habit)
habit['streak']['current'] = current_streak
# Update best streak if current is higher
if current_streak > habit['streak']['best']:
habit['streak']['best'] = current_streak
# Update lastCheckIn
habit['streak']['lastCheckIn'] = today
# Check for weekly lives recovery (+1 life if ≥1 check-in in previous week)
new_lives, was_awarded = habits_helpers.check_and_award_weekly_lives(habit)
lives_awarded_this_checkin = False
if was_awarded:
habit['lives'] = new_lives
habit['lastLivesAward'] = today
lives_awarded_this_checkin = True
# Update timestamp
habit['updatedAt'] = datetime.now().isoformat()
habits_data['lastUpdated'] = habit['updatedAt']
# Save to file
with open(HABITS_FILE, 'w', encoding='utf-8') as f:
json.dump(habits_data, f, indent=2)
# Enrich habit with calculated stats before returning
current_streak = habits_helpers.calculate_streak(habit)
best_streak = habit.get('streak', {}).get('best', 0)
completion_rate = habits_helpers.get_completion_rate(habit, days=30)
weekly_summary = habits_helpers.get_weekly_summary(habit)
enriched_habit = habit.copy()
enriched_habit['current_streak'] = current_streak
enriched_habit['best_streak'] = best_streak
enriched_habit['completion_rate_30d'] = completion_rate
enriched_habit['weekly_summary'] = weekly_summary
enriched_habit['should_check_today'] = habits_helpers.should_check_today(habit)
enriched_habit['livesAwarded'] = lives_awarded_this_checkin
# Return enriched habit
self.send_json(enriched_habit, 200)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_uncheck(self):
"""Uncheck a habit (remove completion for a specific date)."""
try:
# Extract habit ID from path (/api/habits/{id}/check)
path_parts = self.path.split('?')[0].split('/')
if len(path_parts) < 5:
self.send_json({'error': 'Invalid path'}, 400)
return
habit_id = path_parts[3]
# Parse query string for date parameter
parsed = urlparse(self.path)
query_params = parse_qs(parsed.query)
# Get date from query string (required)
if 'date' not in query_params:
self.send_json({'error': 'date parameter is required (format: YYYY-MM-DD)'}, 400)
return
target_date = query_params['date'][0]
# Validate date format
try:
datetime.fromisoformat(target_date)
except ValueError:
self.send_json({'error': 'Invalid date format. Use YYYY-MM-DD'}, 400)
return
# Read existing habits
if not HABITS_FILE.exists():
self.send_json({'error': 'Habit not found'}, 404)
return
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
habits_data = json.load(f)
# Find habit
habit = None
for h in habits_data.get('habits', []):
if h['id'] == habit_id:
habit = h
break
if not habit:
self.send_json({'error': 'Habit not found'}, 404)
return
# Find and remove the completion for the specified date
completions = habit.get('completions', [])
completion_found = False
for i, completion in enumerate(completions):
if completion.get('date') == target_date:
completions.pop(i)
completion_found = True
break
if not completion_found:
self.send_json({'error': 'No completion found for the specified date'}, 404)
return
# Recalculate streak after removing completion
current_streak = habits_helpers.calculate_streak(habit)
habit['streak']['current'] = current_streak
# Update best streak if needed (best never decreases, but we keep it for consistency)
if current_streak > habit['streak']['best']:
habit['streak']['best'] = current_streak
# Update timestamp
habit['updatedAt'] = datetime.now().isoformat()
habits_data['lastUpdated'] = habit['updatedAt']
# Save to file
with open(HABITS_FILE, 'w', encoding='utf-8') as f:
json.dump(habits_data, f, indent=2)
# Enrich habit with calculated stats before returning
best_streak = habit.get('streak', {}).get('best', 0)
completion_rate = habits_helpers.get_completion_rate(habit, days=30)
weekly_summary = habits_helpers.get_weekly_summary(habit)
enriched_habit = habit.copy()
enriched_habit['current_streak'] = current_streak
enriched_habit['best_streak'] = best_streak
enriched_habit['completion_rate_30d'] = completion_rate
enriched_habit['weekly_summary'] = weekly_summary
enriched_habit['should_check_today'] = habits_helpers.should_check_today(habit)
# Return enriched habit
self.send_json(enriched_habit, 200)
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_habits_skip(self):
"""Skip a day using a life to preserve streak."""
try:
# Extract habit ID from path (/api/habits/{id}/skip)
path_parts = self.path.split('/')
if len(path_parts) < 5:
self.send_json({'error': 'Invalid path'}, 400)
return
habit_id = path_parts[3]
# Read existing habits
if not HABITS_FILE.exists():
self.send_json({'error': 'Habit not found'}, 404)
return
with open(HABITS_FILE, 'r', encoding='utf-8') as f:
habits_data = json.load(f)
# Find habit
habit = None
for h in habits_data.get('habits', []):
if h['id'] == habit_id:
habit = h
break
if not habit:
self.send_json({'error': 'Habit not found'}, 404)
return
# Verify lives > 0
current_lives = habit.get('lives', 3)
if current_lives <= 0:
self.send_json({'error': 'No lives remaining'}, 400)
return
# Decrement lives by 1
habit['lives'] = current_lives - 1
# Add completion entry with type='skip'
today = datetime.now().date().isoformat()
completion_entry = {
'date': today,
'type': 'skip'
}
habit['completions'].append(completion_entry)
# Update timestamp
habit['updatedAt'] = datetime.now().isoformat()
habits_data['lastUpdated'] = habit['updatedAt']
# Save to file
with open(HABITS_FILE, 'w', encoding='utf-8') as f:
json.dump(habits_data, f, indent=2)
# Enrich habit with calculated stats before returning
current_streak = habits_helpers.calculate_streak(habit)
best_streak = habit.get('streak', {}).get('best', 0)
completion_rate = habits_helpers.get_completion_rate(habit, days=30)
weekly_summary = habits_helpers.get_weekly_summary(habit)
enriched_habit = habit.copy()
enriched_habit['current_streak'] = current_streak
enriched_habit['best_streak'] = best_streak
enriched_habit['completion_rate_30d'] = completion_rate
enriched_habit['weekly_summary'] = weekly_summary
enriched_habit['should_check_today'] = habits_helpers.should_check_today(habit)
# Return enriched habit
self.send_json(enriched_habit, 200)
except Exception as e:
self.send_json({'error': str(e)}, 500)
# ── Eco (echo-core) handlers ──────────────────────────────────────
def handle_eco_status(self):
"""Get status of echo-core services + active sessions."""
try:
services = []
for svc in ECO_SERVICES:
info = {'name': svc, 'active': False, 'pid': None, 'uptime': None, 'memory': None}
result = subprocess.run(
['systemctl', '--user', 'is-active', svc],
capture_output=True, text=True, timeout=5
)
info['active'] = result.stdout.strip() == 'active'
if info['active']:
# PID
result = subprocess.run(
['systemctl', '--user', 'show', '-p', 'MainPID', '--value', svc],
capture_output=True, text=True, timeout=5
)
pid = result.stdout.strip()
if pid and pid != '0':
info['pid'] = int(pid)
# Uptime via systemctl timestamp
try:
r = subprocess.run(
['systemctl', '--user', 'show', '-p', 'ActiveEnterTimestamp', '--value', svc],
capture_output=True, text=True, timeout=5
)
ts = r.stdout.strip()
if ts:
start = datetime.strptime(ts, '%a %Y-%m-%d %H:%M:%S %Z')
info['uptime'] = int((datetime.utcnow() - start).total_seconds())
except Exception:
pass
# Memory (VmRSS from /proc)
try:
for line in Path(f'/proc/{pid}/status').read_text().splitlines():
if line.startswith('VmRSS:'):
info['memory'] = line.split(':')[1].strip()
break
except Exception:
pass
services.append(info)
self.send_json({'services': services})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def _eco_channel_map(self):
"""Build channel_id -> {name, platform, is_group} from config.json."""
config_file = ECHO_CORE_DIR / 'config.json'
m = {}
try:
cfg = json.loads(config_file.read_text())
for name, ch in cfg.get('channels', {}).items():
m[str(ch['id'])] = {'name': name, 'platform': 'discord'}
for name, ch in cfg.get('telegram_channels', {}).items():
m[str(ch['id'])] = {'name': name, 'platform': 'telegram'}
for name, ch in cfg.get('whatsapp_channels', {}).items():
m[str(ch['id'])] = {'name': name, 'platform': 'whatsapp', 'is_group': True}
for admin_id in cfg.get('bot', {}).get('admins', []):
m.setdefault(str(admin_id), {'name': f'TG DM', 'platform': 'telegram'})
wa_owner = cfg.get('whatsapp', {}).get('owner', '')
if wa_owner:
m.setdefault(f'wa-{wa_owner}', {'name': 'WA Owner', 'platform': 'whatsapp'})
except Exception:
pass
return m
def _eco_enrich_sessions(self):
"""Return enriched sessions list sorted by last_message_at desc."""
raw = {}
if ECHO_SESSIONS_FILE.exists():
try:
raw = json.loads(ECHO_SESSIONS_FILE.read_text())
except Exception:
pass
cmap = self._eco_channel_map()
sessions = []
if isinstance(raw, dict):
for ch_id, sdata in raw.items():
if 'MagicMock' in ch_id:
continue
entry = dict(sdata) if isinstance(sdata, dict) else {}
entry['channel_id'] = ch_id
if ch_id in cmap:
entry['platform'] = cmap[ch_id]['platform']
entry['channel_name'] = cmap[ch_id]['name']
entry['is_group'] = cmap[ch_id].get('is_group', False)
elif ch_id.startswith('wa-') or '@g.us' in ch_id or '@s.whatsapp.net' in ch_id:
entry['platform'] = 'whatsapp'
entry['is_group'] = '@g.us' in ch_id
entry['channel_name'] = ('WA Grup' if entry['is_group'] else 'WA DM')
elif ch_id.isdigit() and len(ch_id) >= 17:
entry['platform'] = 'discord'
entry['channel_name'] = 'Discord #' + ch_id[-6:]
elif ch_id.isdigit():
entry['platform'] = 'telegram'
entry['channel_name'] = 'TG ' + ch_id
else:
entry['platform'] = 'unknown'
entry['channel_name'] = ch_id[:20]
sessions.append(entry)
sessions.sort(key=lambda s: s.get('last_message_at', ''), reverse=True)
return sessions
def handle_eco_sessions(self):
"""Return enriched sessions list."""
try:
self.send_json({'sessions': self._eco_enrich_sessions()})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_eco_session_content(self):
"""Return conversation messages from a session transcript."""
try:
params = parse_qs(urlparse(self.path).query)
session_id = params.get('id', [''])[0]
if not session_id or '/' in session_id or '..' in session_id:
self.send_json({'error': 'Invalid session id'}, 400)
return
transcript = Path.home() / '.claude' / 'projects' / '-home-moltbot-echo-core' / f'{session_id}.jsonl'
if not transcript.exists():
self.send_json({'messages': [], 'error': 'Transcript not found'})
return
messages = []
for line in transcript.read_text().splitlines():
try:
d = json.loads(line)
except Exception:
continue
t = d.get('type', '')
if t == 'user':
msg = d.get('message', {})
content = msg.get('content', '')
if isinstance(content, str):
# Strip [EXTERNAL CONTENT] wrappers
text = content.replace('[EXTERNAL CONTENT]\n', '').replace('\n[END EXTERNAL CONTENT]', '').strip()
if text:
messages.append({'role': 'user', 'text': text[:2000]})
elif t == 'assistant':
msg = d.get('message', {})
content = msg.get('content', '')
if isinstance(content, list):
parts = []
for block in content:
if block.get('type') == 'text':
parts.append(block['text'])
text = '\n'.join(parts).strip()
if text:
messages.append({'role': 'assistant', 'text': text[:2000]})
elif isinstance(content, str) and content.strip():
messages.append({'role': 'assistant', 'text': content[:2000]})
self.send_json({'messages': messages})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_eco_restart(self):
"""Restart an echo-core service (not taskboard)."""
try:
data = self._read_post_json()
svc = data.get('service', '')
if svc not in ECO_SERVICES:
self.send_json({'success': False, 'error': f'Unknown service: {svc}'}, 400)
return
if svc == 'echo-taskboard':
self.send_json({'success': False, 'error': 'Cannot restart taskboard from itself'}, 400)
return
result = subprocess.run(
['systemctl', '--user', 'restart', svc],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
self.send_json({'success': True, 'message': f'{svc} restarted'})
else:
self.send_json({'success': False, 'error': result.stderr.strip()}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_eco_stop(self):
"""Stop an echo-core service (not taskboard)."""
try:
data = self._read_post_json()
svc = data.get('service', '')
if svc not in ECO_SERVICES:
self.send_json({'success': False, 'error': f'Unknown service: {svc}'}, 400)
return
if svc == 'echo-taskboard':
self.send_json({'success': False, 'error': 'Cannot stop taskboard from itself'}, 400)
return
result = subprocess.run(
['systemctl', '--user', 'stop', svc],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
self.send_json({'success': True, 'message': f'{svc} stopped'})
else:
self.send_json({'success': False, 'error': result.stderr.strip()}, 500)
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def handle_eco_logs(self):
"""Return last N lines from echo-core.log."""
try:
params = parse_qs(urlparse(self.path).query)
lines = min(int(params.get('lines', ['100'])[0]), 500)
if not ECHO_LOG_FILE.exists():
self.send_json({'lines': ['(log file not found)']})
return
result = subprocess.run(
['tail', '-n', str(lines), str(ECHO_LOG_FILE)],
capture_output=True, text=True, timeout=10
)
self.send_json({'lines': result.stdout.splitlines()})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_eco_doctor(self):
"""Run health checks on echo-core ecosystem."""
checks = []
# 1. Services
for svc in ECO_SERVICES:
try:
r = subprocess.run(
['systemctl', '--user', 'is-active', svc],
capture_output=True, text=True, timeout=5
)
active = r.stdout.strip() == 'active'
checks.append({
'name': f'Service: {svc}',
'pass': active,
'detail': 'active' if active else r.stdout.strip()
})
except Exception as e:
checks.append({'name': f'Service: {svc}', 'pass': False, 'detail': str(e)})
# 2. Disk space
try:
st = shutil.disk_usage('/')
pct_free = (st.free / st.total) * 100
checks.append({
'name': 'Disk space',
'pass': pct_free > 5,
'detail': f'{pct_free:.1f}% free ({st.free // (1024**3)} GB)'
})
except Exception as e:
checks.append({'name': 'Disk space', 'pass': False, 'detail': str(e)})
# 3. Log file
try:
if ECHO_LOG_FILE.exists():
size = ECHO_LOG_FILE.stat().st_size
size_mb = size / (1024 * 1024)
checks.append({
'name': 'Log file',
'pass': size_mb < 100,
'detail': f'{size_mb:.1f} MB'
})
else:
checks.append({'name': 'Log file', 'pass': False, 'detail': 'Not found'})
except Exception as e:
checks.append({'name': 'Log file', 'pass': False, 'detail': str(e)})
# 4. Sessions file
try:
if ECHO_SESSIONS_FILE.exists():
data = json.loads(ECHO_SESSIONS_FILE.read_text())
count = len(data) if isinstance(data, list) else len(data.keys()) if isinstance(data, dict) else 0
checks.append({'name': 'Sessions file', 'pass': True, 'detail': f'{count} active'})
else:
checks.append({'name': 'Sessions file', 'pass': False, 'detail': 'Not found'})
except Exception as e:
checks.append({'name': 'Sessions file', 'pass': False, 'detail': str(e)})
# 5. Config
config_file = ECHO_CORE_DIR / 'config.json'
try:
if config_file.exists():
json.loads(config_file.read_text())
checks.append({'name': 'Config', 'pass': True, 'detail': 'Valid JSON'})
else:
checks.append({'name': 'Config', 'pass': False, 'detail': 'Not found'})
except Exception as e:
checks.append({'name': 'Config', 'pass': False, 'detail': str(e)})
# 6. WhatsApp bridge log
wa_log = ECHO_CORE_DIR / 'logs' / 'whatsapp-bridge.log'
try:
if wa_log.exists():
# Check last line for errors
r = subprocess.run(
['tail', '-1', str(wa_log)],
capture_output=True, text=True, timeout=5
)
last = r.stdout.strip()
has_error = 'error' in last.lower() or 'fatal' in last.lower()
checks.append({
'name': 'WhatsApp bridge log',
'pass': not has_error,
'detail': last[:80] if last else 'Empty'
})
else:
checks.append({'name': 'WhatsApp bridge log', 'pass': False, 'detail': 'Not found'})
except Exception as e:
checks.append({'name': 'WhatsApp bridge log', 'pass': False, 'detail': str(e)})
# 7. Claude CLI
try:
r = subprocess.run(
['which', 'claude'],
capture_output=True, text=True, timeout=5
)
found = r.returncode == 0
checks.append({
'name': 'Claude CLI',
'pass': found,
'detail': r.stdout.strip() if found else 'Not in PATH'
})
except Exception as e:
checks.append({'name': 'Claude CLI', 'pass': False, 'detail': str(e)})
self.send_json({'checks': checks})
def handle_eco_sessions_clear(self):
"""Clear active sessions (all or specific channel)."""
try:
data = self._read_post_json()
channel = data.get('channel', None)
if not ECHO_SESSIONS_FILE.exists():
self.send_json({'success': True, 'message': 'No sessions file'})
return
if channel:
# Remove specific channel
sessions = json.loads(ECHO_SESSIONS_FILE.read_text())
if isinstance(sessions, list):
sessions = [s for s in sessions if s.get('channel') != channel]
elif isinstance(sessions, dict):
sessions.pop(channel, None)
ECHO_SESSIONS_FILE.write_text(json.dumps(sessions, indent=2))
self.send_json({'success': True, 'message': f'Cleared session: {channel}'})
else:
# Clear all
if isinstance(json.loads(ECHO_SESSIONS_FILE.read_text()), list):
ECHO_SESSIONS_FILE.write_text('[]')
else:
ECHO_SESSIONS_FILE.write_text('{}')
self.send_json({'success': True, 'message': 'All sessions cleared'})
except Exception as e:
self.send_json({'success': False, 'error': str(e)}, 500)
def send_json(self, data, code=200):
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def process_youtube(url):
"""Download subtitles, summarize, save note."""
import time
# Get video info and subtitles
yt_dlp = os.path.expanduser('~/.local/bin/yt-dlp')
# Get title
result = subprocess.run(
[yt_dlp, '--dump-json', '--no-download', url],
capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
print(f"Failed to get video info: {result.stderr}")
return
info = json.loads(result.stdout)
title = info.get('title', 'Unknown')
duration = info.get('duration', 0)
video_id = info.get('id', 'unknown')
# Download subtitles
temp_dir = Path('/tmp/yt_subs')
temp_dir.mkdir(exist_ok=True)
for f in temp_dir.glob('*'):
f.unlink()
subprocess.run([
yt_dlp, '--write-auto-subs', '--sub-langs', 'en',
'--skip-download', '--sub-format', 'vtt',
'-o', str(temp_dir / '%(id)s'),
url
], capture_output=True, timeout=120)
# Find and read subtitle file
transcript = None
for sub_file in temp_dir.glob('*.vtt'):
content = sub_file.read_text(encoding='utf-8', errors='replace')
transcript = clean_vtt(content)
break
if not transcript:
print("No subtitles found")
return
# Create note filename
date_str = datetime.now().strftime('%Y-%m-%d')
slug = re.sub(r'[^\w\s-]', '', title.lower())[:50].strip().replace(' ', '-')
filename = f"{date_str}_{slug}.md"
# Create simple note (without AI summary for now - just transcript)
note_content = f"""# {title}
**Video:** {url}
**Duration:** {duration // 60}:{duration % 60:02d}
**Saved:** {date_str}
**Tags:** #youtube #to-summarize
---
## Transcript
{transcript[:15000]}
---
*Notă: Sumarizarea va fi adăugată de Echo.*
"""
# Save note
NOTES_DIR.mkdir(parents=True, exist_ok=True)
note_path = NOTES_DIR / filename
note_path.write_text(note_content, encoding='utf-8')
# Update index
subprocess.run([
sys.executable, str(TOOLS_DIR / 'update_notes_index.py')
], capture_output=True)
print(f"Created note: {filename}")
return filename
def clean_vtt(content):
"""Convert VTT to plain text."""
lines = []
seen = set()
for line in content.split('\n'):
if any([
line.startswith('WEBVTT'),
line.startswith('Kind:'),
line.startswith('Language:'),
'-->' in line,
line.strip().startswith('<'),
not line.strip(),
re.match(r'^\d+$', line.strip())
]):
continue
clean = re.sub(r'<[^>]+>', '', line).strip()
if clean and clean not in seen:
seen.add(clean)
lines.append(clean)
return ' '.join(lines)
if __name__ == '__main__':
port = 8088
os.chdir(KANBAN_DIR)
print(f"Starting Echo Task Board API on port {port}")
httpd = HTTPServer(('0.0.0.0', port), TaskBoardHandler)
httpd.serve_forever()