#!/usr/bin/env python3 """ Simple API server for Echo Task Board. Handles YouTube summarization requests. """ import json import shutil import subprocess import sys import re import os import signal import uuid from http.server import HTTPServer, SimpleHTTPRequestHandler from urllib.parse import parse_qs, urlparse from datetime import datetime from pathlib import Path # Import habits helpers sys.path.insert(0, str(Path(__file__).parent)) import habits_helpers BASE_DIR = Path(__file__).parent.parent TOOLS_DIR = BASE_DIR / 'tools' NOTES_DIR = BASE_DIR / 'kb' / 'youtube' KANBAN_DIR = BASE_DIR / 'dashboard' WORKSPACE_DIR = Path('/home/moltbot/workspace') HABITS_FILE = KANBAN_DIR / 'habits.json' # Load .env file if present _env_file = Path(__file__).parent / '.env' if _env_file.exists(): for line in _env_file.read_text().splitlines(): line = line.strip() if line and not line.startswith('#') and '=' in line: k, v = line.split('=', 1) os.environ.setdefault(k.strip(), v.strip()) GITEA_URL = os.environ.get('GITEA_URL', 'https://gitea.romfast.ro') GITEA_ORG = os.environ.get('GITEA_ORG', 'romfast') GITEA_TOKEN = os.environ.get('GITEA_TOKEN', '') class TaskBoardHandler(SimpleHTTPRequestHandler): def do_POST(self): if self.path == '/api/youtube': self.handle_youtube() elif self.path == '/api/files': self.handle_files_post() elif self.path == '/api/refresh-index': self.handle_refresh_index() elif self.path == '/api/git-commit': self.handle_git_commit() elif self.path == '/api/pdf': self.handle_pdf_post() elif self.path == '/api/habits': self.handle_habits_post() elif self.path.startswith('/api/habits/') and self.path.endswith('/check'): self.handle_habits_check() elif self.path.startswith('/api/habits/') and self.path.endswith('/skip'): self.handle_habits_skip() elif self.path == '/api/workspace/run': self.handle_workspace_run() elif self.path == '/api/workspace/stop': self.handle_workspace_stop() elif self.path == '/api/workspace/git/commit': self.handle_workspace_git_commit() elif self.path == '/api/workspace/git/push': self.handle_workspace_git_push() elif self.path == '/api/workspace/delete': self.handle_workspace_delete() else: self.send_error(404) def do_PUT(self): if self.path.startswith('/api/habits/'): self.handle_habits_put() else: self.send_error(404) def do_DELETE(self): if self.path.startswith('/api/habits/'): self.handle_habits_delete() else: self.send_error(404) def handle_git_commit(self): """Run git commit and push.""" try: script = TOOLS_DIR / 'git_commit.py' result = subprocess.run( [sys.executable, str(script), '--push'], capture_output=True, text=True, timeout=60, cwd=str(BASE_DIR) ) output = result.stdout + result.stderr # Parse files count files_match = re.search(r'Files changed: (\d+)', output) files = int(files_match.group(1)) if files_match else 0 if result.returncode == 0 or 'Pushing...' in output: self.send_json({ 'success': True, 'files': files, 'output': output }) else: self.send_json({ 'success': False, 'error': output or 'Unknown error' }) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def handle_refresh_index(self): """Regenerate memory/kb/index.json""" try: script = TOOLS_DIR / 'update_notes_index.py' result = subprocess.run( [sys.executable, str(script)], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: # Parse output for stats output = result.stdout total_match = re.search(r'with (\d+) notes', output) total = int(total_match.group(1)) if total_match else 0 self.send_json({ 'success': True, 'message': f'Index regenerat cu {total} notițe', 'total': total, 'output': output }) else: self.send_json({ 'success': False, 'error': result.stderr or 'Unknown error' }, 500) except subprocess.TimeoutExpired: self.send_json({'success': False, 'error': 'Timeout'}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def handle_files_post(self): """Save file content.""" try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) path = data.get('path', '') content = data.get('content', '') # Allow access to clawd and workspace allowed_dirs = [ Path('/home/moltbot/clawd'), Path('/home/moltbot/workspace') ] # Try to resolve against each allowed directory target = None workspace = None for base in allowed_dirs: try: candidate = (base / path).resolve() # Check if candidate is within ANY allowed directory (handles symlinks) if any(str(candidate).startswith(str(d)) for d in allowed_dirs): target = candidate workspace = base break except: continue if target is None: self.send_json({'error': 'Access denied'}, 403) return # Create parent dirs if needed target.parent.mkdir(parents=True, exist_ok=True) # Write file target.write_text(content, encoding='utf-8') self.send_json({ 'status': 'saved', 'path': path, 'size': len(content) }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_pdf_post(self): """Convert markdown to PDF (text-based, not image) using venv script.""" try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) markdown_content = data.get('markdown', '') filename = data.get('filename', 'document.pdf') if not markdown_content: self.send_json({'error': 'No markdown content'}, 400) return # Call PDF generator script in venv venv_python = BASE_DIR / 'venv' / 'bin' / 'python3' pdf_script = TOOLS_DIR / 'generate_pdf.py' if not venv_python.exists(): self.send_json({'error': 'Venv Python not found'}, 500) return if not pdf_script.exists(): self.send_json({'error': 'PDF generator script not found'}, 500) return # Prepare input JSON input_data = json.dumps({ 'markdown': markdown_content, 'filename': filename }) # Call script with stdin result = subprocess.run( [str(venv_python), str(pdf_script)], input=input_data.encode('utf-8'), capture_output=True, timeout=30 ) if result.returncode != 0: # Error from script error_msg = result.stderr.decode('utf-8', errors='replace') try: error_json = json.loads(error_msg) self.send_json(error_json, 500) except: self.send_json({'error': error_msg}, 500) return # PDF bytes from stdout pdf_bytes = result.stdout # Send as file download self.send_response(200) self.send_header('Content-Type', 'application/pdf') self.send_header('Content-Disposition', f'attachment; filename="{filename}"') self.send_header('Content-Length', str(len(pdf_bytes))) self.end_headers() self.wfile.write(pdf_bytes) except subprocess.TimeoutExpired: self.send_json({'error': 'PDF generation timeout'}, 500) except Exception as e: self.send_json({'error': str(e)}, 500) def do_GET(self): if self.path == '/api/status': self.send_json({'status': 'ok', 'time': datetime.now().isoformat()}) elif self.path == '/api/git' or self.path.startswith('/api/git?'): self.handle_git_status() elif self.path == '/api/agents' or self.path.startswith('/api/agents?'): self.handle_agents_status() elif self.path == '/api/cron' or self.path.startswith('/api/cron?'): self.handle_cron_status() elif self.path == '/api/activity' or self.path.startswith('/api/activity?'): self.handle_activity() elif self.path == '/api/habits': self.handle_habits_get() elif self.path.startswith('/api/files'): self.handle_files_get() elif self.path.startswith('/api/diff'): self.handle_git_diff() elif self.path == '/api/workspace' or self.path.startswith('/api/workspace?'): self.handle_workspace_list() elif self.path.startswith('/api/workspace/git/diff'): self.handle_workspace_git_diff() elif self.path.startswith('/api/workspace/logs'): self.handle_workspace_logs() elif self.path.startswith('/api/'): self.send_error(404) else: # Serve static files super().do_GET() def handle_git_status(self): """Get git status for dashboard.""" try: workspace = Path('/home/moltbot/clawd') # Get current branch branch = subprocess.run( ['git', 'branch', '--show-current'], cwd=workspace, capture_output=True, text=True, timeout=5 ).stdout.strip() # Get last commit last_commit = subprocess.run( ['git', 'log', '-1', '--format=%h|%s|%cr'], cwd=workspace, capture_output=True, text=True, timeout=5 ).stdout.strip() commit_parts = last_commit.split('|') if last_commit else ['', '', ''] # Get uncommitted files status_output = subprocess.run( ['git', 'status', '--short'], cwd=workspace, capture_output=True, text=True, timeout=5 ).stdout.strip() uncommitted = status_output.split('\n') if status_output else [] uncommitted = [f for f in uncommitted if f.strip()] # Get diff stats if there are uncommitted files diff_stat = '' if uncommitted: diff_stat = subprocess.run( ['git', 'diff', '--stat', '--cached'], cwd=workspace, capture_output=True, text=True, timeout=5 ).stdout.strip() if not diff_stat: diff_stat = subprocess.run( ['git', 'diff', '--stat'], cwd=workspace, capture_output=True, text=True, timeout=5 ).stdout.strip() # Parse uncommitted into structured format # Format: XY PATH where XY is 2 chars (index + working tree status) # Examples: "M AGENTS.md" (staged), " M tools.md" (unstaged), "?? file" (untracked) # The format varies: sometimes 1 space after status, sometimes 2 uncommitted_parsed = [] for line in uncommitted: if len(line) >= 2: status = line[:2].strip() # Get 2 chars and strip whitespace filepath = line[2:].strip() # Get everything after position 2 and strip if filepath: # Only add if filepath is not empty uncommitted_parsed.append({'status': status, 'path': filepath}) self.send_json({ 'branch': branch, 'lastCommit': { 'hash': commit_parts[0] if len(commit_parts) > 0 else '', 'message': commit_parts[1] if len(commit_parts) > 1 else '', 'time': commit_parts[2] if len(commit_parts) > 2 else '' }, 'uncommitted': uncommitted, 'uncommittedParsed': uncommitted_parsed, 'uncommittedCount': len(uncommitted), 'diffStat': diff_stat, 'clean': len(uncommitted) == 0 }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_git_diff(self): """Get git diff for a specific file.""" from urllib.parse import urlparse, parse_qs parsed = urlparse(self.path) params = parse_qs(parsed.query) filepath = params.get('path', [''])[0] if not filepath: self.send_json({'error': 'path required'}, 400) return try: workspace = Path('/home/moltbot/clawd') # Security check target = (workspace / filepath).resolve() if not str(target).startswith(str(workspace)): self.send_json({'error': 'Access denied'}, 403) return # Get diff (try staged first, then unstaged) diff = subprocess.run( ['git', 'diff', '--cached', '--', filepath], cwd=workspace, capture_output=True, text=True, timeout=10 ).stdout if not diff: diff = subprocess.run( ['git', 'diff', '--', filepath], cwd=workspace, capture_output=True, text=True, timeout=10 ).stdout # If still no diff, file might be untracked - show full content if not diff: status = subprocess.run( ['git', 'status', '--short', '--', filepath], cwd=workspace, capture_output=True, text=True, timeout=5 ).stdout.strip() if status.startswith('??'): # Untracked file - show as new if target.exists(): content = target.read_text(encoding='utf-8', errors='replace')[:50000] diff = f"+++ b/{filepath}\n" + '\n'.join(f'+{line}' for line in content.split('\n')) self.send_json({ 'path': filepath, 'diff': diff or 'No changes', 'hasDiff': bool(diff) }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_agents_status(self): """Get agents status - fast version reading session files directly.""" try: # Define known agents agents_config = [ {'id': 'echo', 'name': 'Echo', 'emoji': '🌀'}, {'id': 'echo-work', 'name': 'Work', 'emoji': '⚡'}, {'id': 'echo-health', 'name': 'Health', 'emoji': '❤️'}, {'id': 'echo-growth', 'name': 'Growth', 'emoji': '🪜'}, {'id': 'echo-sprijin', 'name': 'Sprijin', 'emoji': '⭕'}, {'id': 'echo-scout', 'name': 'Scout', 'emoji': '⚜️'}, ] # Check active sessions by reading session files directly (fast) active_agents = set() sessions_base = Path.home() / '.clawdbot' / 'agents' if sessions_base.exists(): for agent_dir in sessions_base.iterdir(): if agent_dir.is_dir(): sessions_file = agent_dir / 'sessions' / 'sessions.json' if sessions_file.exists(): try: data = json.loads(sessions_file.read_text()) # sessions.json is an object with session keys now = datetime.now().timestamp() * 1000 for key, sess in data.items(): if isinstance(sess, dict): last_active = sess.get('updatedAt', 0) if now - last_active < 30 * 60 * 1000: # 30 min active_agents.add(agent_dir.name) break except: pass # Build response agents = [] for cfg in agents_config: agents.append({ 'id': cfg['id'], 'name': cfg['name'], 'emoji': cfg['emoji'], 'active': cfg['id'] in active_agents }) self.send_json({'agents': agents}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_cron_status(self): """Get cron jobs status from ~/.clawdbot/cron/jobs.json""" try: jobs_file = Path.home() / '.clawdbot' / 'cron' / 'jobs.json' if not jobs_file.exists(): self.send_json({'jobs': [], 'error': 'No jobs file found'}) return data = json.loads(jobs_file.read_text()) all_jobs = data.get('jobs', []) # Filter enabled jobs and format for dashboard now_ms = datetime.now().timestamp() * 1000 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) today_start_ms = today_start.timestamp() * 1000 jobs = [] for job in all_jobs: if not job.get('enabled', False): continue # Parse cron expression to get time schedule = job.get('schedule', {}) expr = schedule.get('expr', '') # Simple cron parsing for display - convert UTC to Bucharest parts = expr.split() if len(parts) >= 2: minute = parts[0] hour = parts[1] if minute.isdigit() and (hour.isdigit() or '-' in hour): # Handle hour ranges like "7-17" if '-' in hour: hour_start, hour_end = hour.split('-') hour = hour_start # Show first hour # Convert UTC to Bucharest (UTC+2 winter, UTC+3 summer) from datetime import timezone as dt_timezone from zoneinfo import ZoneInfo try: bucharest = ZoneInfo('Europe/Bucharest') utc_hour = int(hour) utc_minute = int(minute) # Create UTC datetime for today utc_dt = datetime.now(dt_timezone.utc).replace(hour=utc_hour, minute=utc_minute, second=0, microsecond=0) local_dt = utc_dt.astimezone(bucharest) time_str = f"{local_dt.hour:02d}:{local_dt.minute:02d}" except: time_str = f"{int(hour):02d}:{int(minute):02d}" else: time_str = expr[:15] else: time_str = expr[:15] # Check if ran today state = job.get('state', {}) last_run = state.get('lastRunAtMs', 0) ran_today = last_run >= today_start_ms last_status = state.get('lastStatus', 'unknown') jobs.append({ 'id': job.get('id'), 'name': job.get('name'), 'agentId': job.get('agentId'), 'time': time_str, 'schedule': expr, 'ranToday': ran_today, 'lastStatus': last_status if ran_today else None, 'lastRunAtMs': last_run, 'nextRunAtMs': state.get('nextRunAtMs') }) # Sort by time jobs.sort(key=lambda j: j['time']) self.send_json({ 'jobs': jobs, 'total': len(jobs), 'ranToday': sum(1 for j in jobs if j['ranToday']) }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_activity(self): """Aggregate activity from multiple sources: cron jobs, git commits, file changes.""" from datetime import timezone as dt_timezone from zoneinfo import ZoneInfo try: activities = [] bucharest = ZoneInfo('Europe/Bucharest') workspace = Path('/home/moltbot/clawd') # 1. Cron jobs ran today try: result = subprocess.run( ['clawdbot', 'cron', 'list', '--json'], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: cron_data = json.loads(result.stdout) today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) today_start_ms = today_start.timestamp() * 1000 for job in cron_data.get('jobs', []): state = job.get('state', {}) last_run = state.get('lastRunAtMs', 0) if last_run >= today_start_ms: run_time = datetime.fromtimestamp(last_run / 1000, tz=dt_timezone.utc) local_time = run_time.astimezone(bucharest) activities.append({ 'type': 'cron', 'icon': 'clock', 'text': f"Job: {job.get('name', 'unknown')}", 'agent': job.get('agentId', 'echo'), 'time': local_time.strftime('%H:%M'), 'timestamp': last_run, 'status': state.get('lastStatus', 'ok') }) except: pass # 2. Git commits (last 24h) try: result = subprocess.run( ['git', 'log', '--oneline', '--since=24 hours ago', '--format=%H|%s|%at'], cwd=workspace, capture_output=True, text=True, timeout=10 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if '|' in line: parts = line.split('|') if len(parts) >= 3: commit_hash, message, timestamp = parts[0], parts[1], int(parts[2]) commit_time = datetime.fromtimestamp(timestamp, tz=dt_timezone.utc) local_time = commit_time.astimezone(bucharest) activities.append({ 'type': 'git', 'icon': 'git-commit', 'text': message[:60] + ('...' if len(message) > 60 else ''), 'agent': 'git', 'time': local_time.strftime('%H:%M'), 'timestamp': timestamp * 1000, 'commitHash': commit_hash[:8] }) except: pass # 2b. Git uncommitted files try: result = subprocess.run( ['git', 'status', '--short'], cwd=workspace, capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout.strip(): for line in result.stdout.strip().split('\n'): if len(line) >= 4: # Git status format: XY filename (XY = 2 chars status) # Handle both "M " and " M" formats status = line[:2] # Find filepath - skip status chars and any spaces filepath = line[2:].lstrip() if not filepath: continue status_clean = status.strip() status_labels = {'M': 'modificat', 'A': 'adăugat', 'D': 'șters', '??': 'nou', 'R': 'redenumit'} status_label = status_labels.get(status_clean, status_clean) activities.append({ 'type': 'git-file', 'icon': 'file-diff', 'text': f"{filepath}", 'agent': f"git ({status_label})", 'time': 'acum', 'timestamp': int(datetime.now().timestamp() * 1000), 'path': filepath, 'gitStatus': status_clean }) except: pass # 3. Recent files in memory/kb/ (last 24h) try: kb_dir = workspace / 'kb' cutoff = datetime.now().timestamp() - (24 * 3600) for md_file in kb_dir.rglob('*.md'): stat = md_file.stat() if stat.st_mtime > cutoff: file_time = datetime.fromtimestamp(stat.st_mtime, tz=dt_timezone.utc) local_time = file_time.astimezone(bucharest) rel_path = md_file.relative_to(workspace) activities.append({ 'type': 'file', 'icon': 'file-text', 'text': f"Fișier: {md_file.name}", 'agent': str(rel_path.parent), 'time': local_time.strftime('%H:%M'), 'timestamp': int(stat.st_mtime * 1000), 'path': str(rel_path) }) except: pass # 4. Tasks from tasks.json try: tasks_file = workspace / 'dashboard' / 'tasks.json' if tasks_file.exists(): tasks_data = json.loads(tasks_file.read_text()) for col in tasks_data.get('columns', []): for task in col.get('tasks', []): ts_str = task.get('completed') or task.get('created', '') if ts_str: try: ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) if ts.timestamp() > (datetime.now().timestamp() - 7 * 24 * 3600): local_time = ts.astimezone(bucharest) activities.append({ 'type': 'task', 'icon': 'check-circle' if task.get('completed') else 'circle', 'text': task.get('title', ''), 'agent': task.get('agent', 'Echo'), 'time': local_time.strftime('%d %b %H:%M'), 'timestamp': int(ts.timestamp() * 1000), 'status': 'done' if task.get('completed') else col['id'] }) except: pass except: pass # Sort by timestamp descending activities.sort(key=lambda x: x.get('timestamp', 0), reverse=True) # Limit to 30 items activities = activities[:30] self.send_json({ 'activities': activities, 'total': len(activities) }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_files_get(self): """List files or get file content.""" from urllib.parse import urlparse, parse_qs parsed = urlparse(self.path) params = parse_qs(parsed.query) path = params.get('path', [''])[0] action = params.get('action', ['list'])[0] # Security: only allow access within allowed directories allowed_dirs = [ Path('/home/moltbot/clawd'), Path('/home/moltbot/workspace') ] # Try to resolve against each allowed directory target = None workspace = None for base in allowed_dirs: try: candidate = (base / path).resolve() # Check if candidate is within ANY allowed directory (handles symlinks) if any(str(candidate).startswith(str(d)) for d in allowed_dirs): target = candidate workspace = base break except: continue if target is None: self.send_json({'error': 'Access denied'}, 403) return if action == 'list': if not target.exists(): self.send_json({'error': 'Path not found'}, 404) return if target.is_file(): # Return file content try: content = target.read_text(encoding='utf-8', errors='replace') self.send_json({ 'type': 'file', 'path': path, 'name': target.name, 'content': content[:100000], # Limit to 100KB 'size': target.stat().st_size, 'truncated': target.stat().st_size > 100000 }) except Exception as e: self.send_json({'error': str(e)}, 500) else: # List directory items = [] try: for item in sorted(target.iterdir()): stat = item.stat() # Build relative path from original request path item_path = f"{path}/{item.name}" if path else item.name items.append({ 'name': item.name, 'type': 'dir' if item.is_dir() else 'file', 'size': stat.st_size if item.is_file() else None, 'mtime': stat.st_mtime, 'path': item_path }) self.send_json({ 'type': 'dir', 'path': path, 'items': items }) except Exception as e: self.send_json({'error': str(e)}, 500) else: self.send_json({'error': 'Unknown action'}, 400) def handle_workspace_list(self): """List projects in ~/workspace/ with Ralph status, git info, etc.""" try: projects = [] if not WORKSPACE_DIR.exists(): self.send_json({'projects': []}) return for project_dir in sorted(WORKSPACE_DIR.iterdir()): if not project_dir.is_dir() or project_dir.name.startswith('.'): continue ralph_dir = project_dir / 'scripts' / 'ralph' prd_json = ralph_dir / 'prd.json' tasks_dir = project_dir / 'tasks' proj = { 'name': project_dir.name, 'path': str(project_dir), 'hasRalph': ralph_dir.exists(), 'hasPrd': any(tasks_dir.glob('prd-*.md')) if tasks_dir.exists() else False, 'hasMain': (project_dir / 'main.py').exists(), 'hasVenv': (project_dir / 'venv').exists(), 'hasReadme': (project_dir / 'README.md').exists(), 'ralph': None, 'process': {'running': False, 'pid': None, 'port': None}, 'git': None } # Ralph status if prd_json.exists(): try: prd = json.loads(prd_json.read_text()) stories = prd.get('userStories', []) complete = sum(1 for s in stories if s.get('passes')) # Check ralph PID ralph_pid = None ralph_running = False pid_file = ralph_dir / '.ralph.pid' if pid_file.exists(): try: pid = int(pid_file.read_text().strip()) os.kill(pid, 0) # Check if alive ralph_running = True ralph_pid = pid except (ValueError, ProcessLookupError, PermissionError): pass # Last iteration time from logs last_iter = None logs_dir = ralph_dir / 'logs' if logs_dir.exists(): log_files = sorted(logs_dir.glob('iteration-*.log'), key=lambda f: f.stat().st_mtime, reverse=True) if log_files: mtime = log_files[0].stat().st_mtime last_iter = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M') tech = prd.get('techStack', {}) proj['ralph'] = { 'running': ralph_running, 'pid': ralph_pid, 'storiesTotal': len(stories), 'storiesComplete': complete, 'lastIteration': last_iter, 'stories': [ {'id': s.get('id', ''), 'title': s.get('title', ''), 'passes': s.get('passes', False)} for s in stories ] } proj['techStack'] = { 'type': tech.get('type', ''), 'commands': tech.get('commands', {}), 'port': tech.get('port'), } except (json.JSONDecodeError, IOError): pass # Check if main.py is running if proj['hasMain']: try: result = subprocess.run( ['pgrep', '-f', f'python.*{project_dir.name}/main.py'], capture_output=True, text=True, timeout=3 ) if result.stdout.strip(): pids = result.stdout.strip().split('\n') port = None if prd_json.exists(): try: prd_data = json.loads(prd_json.read_text()) port = prd_data.get('techStack', {}).get('port') except (json.JSONDecodeError, IOError): pass proj['process'] = { 'running': True, 'pid': int(pids[0]), 'port': port } except Exception: pass # Git info if (project_dir / '.git').exists(): try: branch = subprocess.run( ['git', 'branch', '--show-current'], cwd=project_dir, capture_output=True, text=True, timeout=5 ).stdout.strip() last_commit = subprocess.run( ['git', 'log', '-1', '--format=%h - %s'], cwd=project_dir, capture_output=True, text=True, timeout=5 ).stdout.strip() status_out = subprocess.run( ['git', 'status', '--short'], cwd=project_dir, capture_output=True, text=True, timeout=5 ).stdout.strip() uncommitted = len([l for l in status_out.split('\n') if l.strip()]) if status_out else 0 proj['git'] = { 'branch': branch, 'lastCommit': last_commit, 'uncommitted': uncommitted } except Exception: pass projects.append(proj) self.send_json({'projects': projects}) except Exception as e: self.send_json({'error': str(e)}, 500) def _read_post_json(self): """Helper to read JSON POST body.""" content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') return json.loads(post_data) def _validate_project(self, name): """Validate project name and return its path, or None.""" if not name or '/' in name or '..' in name: return None project_dir = WORKSPACE_DIR / name if not project_dir.exists() or not project_dir.is_dir(): return None # Ensure it resolves within workspace if not str(project_dir.resolve()).startswith(str(WORKSPACE_DIR)): return None return project_dir def handle_workspace_run(self): """Start a project process (main.py, ralph.sh, or pytest).""" try: data = self._read_post_json() project_name = data.get('project', '') command = data.get('command', '') project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return allowed_commands = {'main', 'ralph', 'test'} if command not in allowed_commands: self.send_json({'success': False, 'error': f'Invalid command. Allowed: {", ".join(allowed_commands)}'}, 400) return ralph_dir = project_dir / 'scripts' / 'ralph' if command == 'main': main_py = project_dir / 'main.py' if not main_py.exists(): self.send_json({'success': False, 'error': 'No main.py found'}, 404) return # Use venv python if available venv_python = project_dir / 'venv' / 'bin' / 'python' python_cmd = str(venv_python) if venv_python.exists() else sys.executable log_path = ralph_dir / 'logs' / 'main.log' if ralph_dir.exists() else project_dir / 'main.log' log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, 'a') as log_file: proc = subprocess.Popen( [python_cmd, 'main.py'], cwd=str(project_dir), stdout=log_file, stderr=log_file, start_new_session=True ) self.send_json({'success': True, 'pid': proc.pid, 'log': str(log_path)}) elif command == 'ralph': ralph_sh = ralph_dir / 'ralph.sh' if not ralph_sh.exists(): self.send_json({'success': False, 'error': 'No ralph.sh found'}, 404) return log_path = ralph_dir / 'logs' / 'ralph.log' log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, 'a') as log_file: proc = subprocess.Popen( ['bash', str(ralph_sh)], cwd=str(project_dir), stdout=log_file, stderr=log_file, start_new_session=True ) # Write PID pid_file = ralph_dir / '.ralph.pid' pid_file.write_text(str(proc.pid)) self.send_json({'success': True, 'pid': proc.pid, 'log': str(log_path)}) elif command == 'test': # Run pytest synchronously (with timeout) venv_python = project_dir / 'venv' / 'bin' / 'python' python_cmd = str(venv_python) if venv_python.exists() else sys.executable result = subprocess.run( [python_cmd, '-m', 'pytest', '-v', '--tb=short'], cwd=str(project_dir), capture_output=True, text=True, timeout=120 ) self.send_json({ 'success': result.returncode == 0, 'output': result.stdout + result.stderr, 'returncode': result.returncode }) except subprocess.TimeoutExpired: self.send_json({'success': False, 'error': 'Test timeout (120s)'}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def handle_workspace_stop(self): """Stop a project process.""" try: data = self._read_post_json() project_name = data.get('project', '') target = data.get('target', '') project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return if target not in ('main', 'ralph'): self.send_json({'success': False, 'error': 'Invalid target. Use: main, ralph'}, 400) return if target == 'ralph': pid_file = project_dir / 'scripts' / 'ralph' / '.ralph.pid' if pid_file.exists(): try: pid = int(pid_file.read_text().strip()) # Verify the process belongs to our user and is within workspace proc_cwd = Path(f'/proc/{pid}/cwd').resolve() if str(proc_cwd).startswith(str(WORKSPACE_DIR)): os.killpg(os.getpgid(pid), signal.SIGTERM) self.send_json({'success': True, 'message': f'Ralph stopped (PID {pid})'}) else: self.send_json({'success': False, 'error': 'Process not in workspace'}, 403) except ProcessLookupError: self.send_json({'success': True, 'message': 'Process already stopped'}) except PermissionError: self.send_json({'success': False, 'error': 'Permission denied'}, 403) else: self.send_json({'success': False, 'error': 'No PID file found'}, 404) elif target == 'main': # Find main.py process for this project try: result = subprocess.run( ['pgrep', '-f', f'python.*{project_dir.name}/main.py'], capture_output=True, text=True, timeout=3 ) if result.stdout.strip(): pid = int(result.stdout.strip().split('\n')[0]) proc_cwd = Path(f'/proc/{pid}/cwd').resolve() if str(proc_cwd).startswith(str(WORKSPACE_DIR)): os.kill(pid, signal.SIGTERM) self.send_json({'success': True, 'message': f'Main stopped (PID {pid})'}) else: self.send_json({'success': False, 'error': 'Process not in workspace'}, 403) else: self.send_json({'success': True, 'message': 'No running process found'}) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def handle_workspace_git_diff(self): """Get git diff for a workspace project.""" try: parsed = urlparse(self.path) params = parse_qs(parsed.query) project_name = params.get('project', [''])[0] project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'error': 'Invalid project'}, 400) return if not (project_dir / '.git').exists(): self.send_json({'error': 'Not a git repository'}, 400) return status = subprocess.run( ['git', 'status', '--short'], cwd=str(project_dir), capture_output=True, text=True, timeout=10 ).stdout.strip() diff = subprocess.run( ['git', 'diff'], cwd=str(project_dir), capture_output=True, text=True, timeout=10 ).stdout diff_cached = subprocess.run( ['git', 'diff', '--cached'], cwd=str(project_dir), capture_output=True, text=True, timeout=10 ).stdout combined_diff = '' if diff_cached: combined_diff += '=== Staged Changes ===\n' + diff_cached if diff: if combined_diff: combined_diff += '\n' combined_diff += '=== Unstaged Changes ===\n' + diff self.send_json({ 'project': project_name, 'status': status, 'diff': combined_diff, 'hasDiff': bool(status) }) except subprocess.TimeoutExpired: self.send_json({'error': 'Timeout'}, 500) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_workspace_git_commit(self): """Commit all changes in a workspace project.""" try: data = self._read_post_json() project_name = data.get('project', '') message = data.get('message', '').strip() project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return if not (project_dir / '.git').exists(): self.send_json({'success': False, 'error': 'Not a git repository'}, 400) return # Check if there's anything to commit porcelain = subprocess.run( ['git', 'status', '--porcelain'], cwd=str(project_dir), capture_output=True, text=True, timeout=10 ).stdout.strip() if not porcelain: self.send_json({'success': False, 'error': 'Nothing to commit'}) return files_changed = len([l for l in porcelain.split('\n') if l.strip()]) # Auto-message if empty if not message: now = datetime.now().strftime('%Y-%m-%d %H:%M') message = f'Update: {now} ({files_changed} files)' # Stage all and commit subprocess.run( ['git', 'add', '-A'], cwd=str(project_dir), capture_output=True, text=True, timeout=10 ) result = subprocess.run( ['git', 'commit', '-m', message], cwd=str(project_dir), capture_output=True, text=True, timeout=30 ) output = result.stdout + result.stderr if result.returncode == 0: self.send_json({ 'success': True, 'message': message, 'output': output, 'filesChanged': files_changed }) else: self.send_json({'success': False, 'error': output or 'Commit failed'}) except subprocess.TimeoutExpired: self.send_json({'success': False, 'error': 'Timeout'}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def _ensure_gitea_remote(self, project_dir, project_name): """Create Gitea repo and add remote if no origin exists. Returns (ok, message).""" import urllib.request if not GITEA_TOKEN: return False, 'GITEA_TOKEN not set' # Create repo via Gitea API api_url = f'{GITEA_URL}/api/v1/orgs/{GITEA_ORG}/repos' payload = json.dumps({'name': project_name, 'private': True, 'auto_init': False}).encode() req = urllib.request.Request(api_url, data=payload, method='POST', headers={ 'Authorization': f'token {GITEA_TOKEN}', 'Content-Type': 'application/json' }) try: resp = urllib.request.urlopen(req, timeout=15) resp.read() except urllib.error.HTTPError as e: body = e.read().decode(errors='replace') if e.code == 409: pass # repo already exists, fine else: return False, f'Gitea API error {e.code}: {body}' # Add remote with token auth remote_url = f'{GITEA_URL}/{GITEA_ORG}/{project_name}.git' # Insert token into URL for push auth auth_url = remote_url.replace('https://', f'https://gitea:{GITEA_TOKEN}@') subprocess.run( ['git', 'remote', 'add', 'origin', auth_url], cwd=str(project_dir), capture_output=True, text=True, timeout=5 ) return True, f'Created repo {GITEA_ORG}/{project_name}' def handle_workspace_git_push(self): """Push a workspace project to its remote, creating Gitea repo if needed.""" try: data = self._read_post_json() project_name = data.get('project', '') project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return if not (project_dir / '.git').exists(): self.send_json({'success': False, 'error': 'Not a git repository'}, 400) return created_msg = '' # Check remote exists, create if not remote_check = subprocess.run( ['git', 'remote', 'get-url', 'origin'], cwd=str(project_dir), capture_output=True, text=True, timeout=10 ) if remote_check.returncode != 0: ok, msg = self._ensure_gitea_remote(project_dir, project_name) if not ok: self.send_json({'success': False, 'error': msg}) return created_msg = msg + '\n' # Push (set upstream on first push) result = subprocess.run( ['git', 'push', '-u', 'origin', 'HEAD'], cwd=str(project_dir), capture_output=True, text=True, timeout=60 ) output = result.stdout + result.stderr if result.returncode == 0: self.send_json({'success': True, 'output': created_msg + (output or 'Pushed successfully')}) else: self.send_json({'success': False, 'error': output or 'Push failed'}) except subprocess.TimeoutExpired: self.send_json({'success': False, 'error': 'Push timeout (60s)'}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def handle_workspace_delete(self): """Delete a workspace project.""" try: data = self._read_post_json() project_name = data.get('project', '') confirm = data.get('confirm', '') project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return if confirm != project_name: self.send_json({'success': False, 'error': 'Confirmation does not match project name'}, 400) return # Check for running processes try: result = subprocess.run( ['pgrep', '-f', f'{project_dir.name}/(main\\.py|ralph)'], capture_output=True, text=True, timeout=5 ) if result.stdout.strip(): self.send_json({'success': False, 'error': 'Project has running processes. Stop them first.'}) return except subprocess.TimeoutExpired: pass shutil.rmtree(str(project_dir)) self.send_json({ 'success': True, 'message': f'Project {project_name} deleted' }) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def handle_workspace_logs(self): """Get last N lines from a project log.""" try: parsed = urlparse(self.path) params = parse_qs(parsed.query) project_name = params.get('project', [''])[0] log_type = params.get('type', ['ralph'])[0] lines_count = min(int(params.get('lines', ['100'])[0]), 500) project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'error': 'Invalid project'}, 400) return ralph_dir = project_dir / 'scripts' / 'ralph' # Determine log file if log_type == 'ralph': log_file = ralph_dir / 'logs' / 'ralph.log' if not log_file.exists(): # Try ralph-test.log log_file = ralph_dir / 'logs' / 'ralph-test.log' elif log_type == 'main': log_file = ralph_dir / 'logs' / 'main.log' if ralph_dir.exists() else project_dir / 'main.log' elif log_type == 'progress': log_file = ralph_dir / 'progress.txt' else: # Try iteration log if log_type.startswith('iteration-'): log_file = ralph_dir / 'logs' / f'{log_type}.log' else: self.send_json({'error': 'Invalid log type'}, 400) return if not log_file.exists(): self.send_json({ 'project': project_name, 'type': log_type, 'lines': [], 'total': 0 }) return # Security: ensure path is within workspace if not str(log_file.resolve()).startswith(str(WORKSPACE_DIR)): self.send_json({'error': 'Access denied'}, 403) return content = log_file.read_text(encoding='utf-8', errors='replace') all_lines = content.split('\n') total = len(all_lines) last_lines = all_lines[-lines_count:] if len(all_lines) > lines_count else all_lines self.send_json({ 'project': project_name, 'type': log_type, 'lines': last_lines, 'total': total }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_youtube(self): try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) url = data.get('url', '').strip() if not url or 'youtube.com' not in url and 'youtu.be' not in url: self.send_json({'error': 'URL YouTube invalid'}, 400) return # Process synchronously (simpler, avoids fork issues) try: print(f"Processing YouTube URL: {url}") result = process_youtube(url) print(f"Processing result: {result}") self.send_json({ 'status': 'done', 'message': 'Notița a fost creată! Refresh pagina Notes.' }) except Exception as e: import traceback print(f"YouTube processing error: {e}") traceback.print_exc() self.send_json({ 'status': 'error', 'message': f'Eroare: {str(e)}' }, 500) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_habits_get(self): """Get all habits with enriched stats.""" try: # Read habits file if not HABITS_FILE.exists(): self.send_json([]) return with open(HABITS_FILE, 'r', encoding='utf-8') as f: data = json.load(f) habits = data.get('habits', []) # Enrich each habit with calculated stats enriched_habits = [] for habit in habits: # Calculate stats using helpers current_streak = habits_helpers.calculate_streak(habit) best_streak = habit.get('streak', {}).get('best', 0) completion_rate = habits_helpers.get_completion_rate(habit, days=30) weekly_summary = habits_helpers.get_weekly_summary(habit) # Add stats to habit enriched = habit.copy() enriched['current_streak'] = current_streak enriched['best_streak'] = best_streak enriched['completion_rate_30d'] = completion_rate enriched['weekly_summary'] = weekly_summary enriched['should_check_today'] = habits_helpers.should_check_today(habit) enriched_habits.append(enriched) # Sort by priority ascending (lower number = higher priority) enriched_habits.sort(key=lambda h: h.get('priority', 999)) self.send_json(enriched_habits) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_habits_post(self): """Create a new habit.""" try: # Read request body content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) # Validate required fields name = data.get('name', '').strip() if not name: self.send_json({'error': 'name is required'}, 400) return if len(name) > 100: self.send_json({'error': 'name must be max 100 characters'}, 400) return # Validate color (hex format) color = data.get('color', '#3b82f6') if color and not re.match(r'^#[0-9A-Fa-f]{6}$', color): self.send_json({'error': 'color must be valid hex format (#RRGGBB)'}, 400) return # Validate frequency type frequency_type = data.get('frequency', {}).get('type', 'daily') valid_types = ['daily', 'specific_days', 'x_per_week', 'weekly', 'monthly', 'custom'] if frequency_type not in valid_types: self.send_json({'error': f'frequency.type must be one of: {", ".join(valid_types)}'}, 400) return # Create new habit habit_id = str(uuid.uuid4()) now = datetime.now().isoformat() new_habit = { 'id': habit_id, 'name': name, 'category': data.get('category', 'other'), 'color': color, 'icon': data.get('icon', 'check-circle'), 'priority': data.get('priority', 5), 'notes': data.get('notes', ''), 'reminderTime': data.get('reminderTime', ''), 'frequency': data.get('frequency', {'type': 'daily'}), 'streak': { 'current': 0, 'best': 0, 'lastCheckIn': None }, 'lives': 3, 'completions': [], 'createdAt': now, 'updatedAt': now } # Read existing habits if HABITS_FILE.exists(): with open(HABITS_FILE, 'r', encoding='utf-8') as f: habits_data = json.load(f) else: habits_data = {'lastUpdated': '', 'habits': []} # Add new habit habits_data['habits'].append(new_habit) habits_data['lastUpdated'] = now # Save to file with open(HABITS_FILE, 'w', encoding='utf-8') as f: json.dump(habits_data, f, indent=2) # Return created habit with 201 status self.send_json(new_habit, 201) except json.JSONDecodeError: self.send_json({'error': 'Invalid JSON'}, 400) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_habits_put(self): """Update an existing habit.""" try: # Extract habit ID from path path_parts = self.path.split('/') if len(path_parts) < 4: self.send_json({'error': 'Invalid path'}, 400) return habit_id = path_parts[3] # Read request body content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) # Read existing habits if not HABITS_FILE.exists(): self.send_json({'error': 'Habit not found'}, 404) return with open(HABITS_FILE, 'r', encoding='utf-8') as f: habits_data = json.load(f) # Find habit to update habits = habits_data.get('habits', []) habit_index = None for i, habit in enumerate(habits): if habit['id'] == habit_id: habit_index = i break if habit_index is None: self.send_json({'error': 'Habit not found'}, 404) return # Validate allowed fields allowed_fields = ['name', 'category', 'color', 'icon', 'priority', 'notes', 'frequency', 'reminderTime'] # Validate name if provided if 'name' in data: name = data['name'].strip() if not name: self.send_json({'error': 'name cannot be empty'}, 400) return if len(name) > 100: self.send_json({'error': 'name must be max 100 characters'}, 400) return # Validate color if provided if 'color' in data: color = data['color'] if color and not re.match(r'^#[0-9A-Fa-f]{6}$', color): self.send_json({'error': 'color must be valid hex format (#RRGGBB)'}, 400) return # Validate frequency type if provided if 'frequency' in data: frequency_type = data.get('frequency', {}).get('type', 'daily') valid_types = ['daily', 'specific_days', 'x_per_week', 'weekly', 'monthly', 'custom'] if frequency_type not in valid_types: self.send_json({'error': f'frequency.type must be one of: {", ".join(valid_types)}'}, 400) return # Update only allowed fields habit = habits[habit_index] for field in allowed_fields: if field in data: habit[field] = data[field] # Update timestamp habit['updatedAt'] = datetime.now().isoformat() # Save to file habits_data['lastUpdated'] = datetime.now().isoformat() with open(HABITS_FILE, 'w', encoding='utf-8') as f: json.dump(habits_data, f, indent=2) # Return updated habit self.send_json(habit) except json.JSONDecodeError: self.send_json({'error': 'Invalid JSON'}, 400) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_habits_delete(self): """Delete a habit.""" try: # Extract habit ID from path path_parts = self.path.split('/') if len(path_parts) < 4: self.send_json({'error': 'Invalid path'}, 400) return habit_id = path_parts[3] # Read existing habits if not HABITS_FILE.exists(): self.send_json({'error': 'Habit not found'}, 404) return with open(HABITS_FILE, 'r', encoding='utf-8') as f: habits_data = json.load(f) # Find and remove habit habits = habits_data.get('habits', []) habit_found = False for i, habit in enumerate(habits): if habit['id'] == habit_id: habits.pop(i) habit_found = True break if not habit_found: self.send_json({'error': 'Habit not found'}, 404) return # Save to file habits_data['lastUpdated'] = datetime.now().isoformat() with open(HABITS_FILE, 'w', encoding='utf-8') as f: json.dump(habits_data, f, indent=2) # Return 204 No Content self.send_response(204) self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() except Exception as e: self.send_json({'error': str(e)}, 500) def handle_habits_check(self): """Check in on a habit (complete it for today).""" try: # Extract habit ID from path (/api/habits/{id}/check) path_parts = self.path.split('/') if len(path_parts) < 5: self.send_json({'error': 'Invalid path'}, 400) return habit_id = path_parts[3] # Read optional body (note, rating, mood) body_data = {} content_length = self.headers.get('Content-Length') if content_length: post_data = self.rfile.read(int(content_length)).decode('utf-8') if post_data.strip(): try: body_data = json.loads(post_data) except json.JSONDecodeError: self.send_json({'error': 'Invalid JSON'}, 400) return # Read existing habits if not HABITS_FILE.exists(): self.send_json({'error': 'Habit not found'}, 404) return with open(HABITS_FILE, 'r', encoding='utf-8') as f: habits_data = json.load(f) # Find habit habit = None for h in habits_data.get('habits', []): if h['id'] == habit_id: habit = h break if not habit: self.send_json({'error': 'Habit not found'}, 404) return # Verify habit is relevant for today if not habits_helpers.should_check_today(habit): self.send_json({'error': 'Habit is not relevant for today based on its frequency'}, 400) return # Verify not already checked today today = datetime.now().date().isoformat() completions = habit.get('completions', []) for completion in completions: if completion.get('date') == today: self.send_json({'error': 'Habit already checked in today'}, 409) return # Create completion entry completion_entry = { 'date': today, 'type': 'check' # Distinguish from 'skip' for life restore logic } # Add optional fields if 'note' in body_data: completion_entry['note'] = body_data['note'] if 'rating' in body_data: rating = body_data['rating'] if not isinstance(rating, int) or rating < 1 or rating > 5: self.send_json({'error': 'rating must be an integer between 1 and 5'}, 400) return completion_entry['rating'] = rating if 'mood' in body_data: mood = body_data['mood'] if mood not in ['happy', 'neutral', 'sad']: self.send_json({'error': 'mood must be one of: happy, neutral, sad'}, 400) return completion_entry['mood'] = mood # Add completion to habit habit['completions'].append(completion_entry) # Recalculate streak current_streak = habits_helpers.calculate_streak(habit) habit['streak']['current'] = current_streak # Update best streak if current is higher if current_streak > habit['streak']['best']: habit['streak']['best'] = current_streak # Update lastCheckIn habit['streak']['lastCheckIn'] = today # Check for life restore: if last 7 completions are all check-ins (no skips) and lives < 3 if habit.get('lives', 3) < 3: recent_completions = sorted( habit['completions'], key=lambda x: x.get('date', ''), reverse=True )[:7] # Check if we have 7 completions and all are check-ins (not skips) if len(recent_completions) == 7: all_checks = all(c.get('type') == 'check' for c in recent_completions) if all_checks: habit['lives'] = min(habit['lives'] + 1, 3) # Update timestamp habit['updatedAt'] = datetime.now().isoformat() habits_data['lastUpdated'] = habit['updatedAt'] # Save to file with open(HABITS_FILE, 'w', encoding='utf-8') as f: json.dump(habits_data, f, indent=2) # Return updated habit self.send_json(habit, 200) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_habits_skip(self): """Skip a day using a life to preserve streak.""" try: # Extract habit ID from path (/api/habits/{id}/skip) path_parts = self.path.split('/') if len(path_parts) < 5: self.send_json({'error': 'Invalid path'}, 400) return habit_id = path_parts[3] # Read existing habits if not HABITS_FILE.exists(): self.send_json({'error': 'Habit not found'}, 404) return with open(HABITS_FILE, 'r', encoding='utf-8') as f: habits_data = json.load(f) # Find habit habit = None for h in habits_data.get('habits', []): if h['id'] == habit_id: habit = h break if not habit: self.send_json({'error': 'Habit not found'}, 404) return # Verify lives > 0 current_lives = habit.get('lives', 3) if current_lives <= 0: self.send_json({'error': 'No lives remaining'}, 400) return # Decrement lives by 1 habit['lives'] = current_lives - 1 # Add completion entry with type='skip' today = datetime.now().date().isoformat() completion_entry = { 'date': today, 'type': 'skip' } habit['completions'].append(completion_entry) # Update timestamp habit['updatedAt'] = datetime.now().isoformat() habits_data['lastUpdated'] = habit['updatedAt'] # Save to file with open(HABITS_FILE, 'w', encoding='utf-8') as f: json.dump(habits_data, f, indent=2) # Return updated habit self.send_json(habit, 200) except Exception as e: self.send_json({'error': str(e)}, 500) def send_json(self, data, code=200): self.send_response(code) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') self.send_header('Pragma', 'no-cache') self.send_header('Expires', '0') self.end_headers() self.wfile.write(json.dumps(data).encode()) def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def process_youtube(url): """Download subtitles, summarize, save note.""" import time # Get video info and subtitles yt_dlp = os.path.expanduser('~/.local/bin/yt-dlp') # Get title result = subprocess.run( [yt_dlp, '--dump-json', '--no-download', url], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"Failed to get video info: {result.stderr}") return info = json.loads(result.stdout) title = info.get('title', 'Unknown') duration = info.get('duration', 0) video_id = info.get('id', 'unknown') # Download subtitles temp_dir = Path('/tmp/yt_subs') temp_dir.mkdir(exist_ok=True) for f in temp_dir.glob('*'): f.unlink() subprocess.run([ yt_dlp, '--write-auto-subs', '--sub-langs', 'en', '--skip-download', '--sub-format', 'vtt', '-o', str(temp_dir / '%(id)s'), url ], capture_output=True, timeout=120) # Find and read subtitle file transcript = None for sub_file in temp_dir.glob('*.vtt'): content = sub_file.read_text(encoding='utf-8', errors='replace') transcript = clean_vtt(content) break if not transcript: print("No subtitles found") return # Create note filename date_str = datetime.now().strftime('%Y-%m-%d') slug = re.sub(r'[^\w\s-]', '', title.lower())[:50].strip().replace(' ', '-') filename = f"{date_str}_{slug}.md" # Create simple note (without AI summary for now - just transcript) note_content = f"""# {title} **Video:** {url} **Duration:** {duration // 60}:{duration % 60:02d} **Saved:** {date_str} **Tags:** #youtube #to-summarize --- ## Transcript {transcript[:15000]} --- *Notă: Sumarizarea va fi adăugată de Echo.* """ # Save note NOTES_DIR.mkdir(parents=True, exist_ok=True) note_path = NOTES_DIR / filename note_path.write_text(note_content, encoding='utf-8') # Update index subprocess.run([ sys.executable, str(TOOLS_DIR / 'update_notes_index.py') ], capture_output=True) # Add task to kanban subprocess.run([ sys.executable, str(KANBAN_DIR / 'update_task.py'), 'add', 'in-progress', f'Sumarizare: {title[:30]}...', url, 'medium' ], capture_output=True) print(f"Created note: {filename}") return filename def clean_vtt(content): """Convert VTT to plain text.""" lines = [] seen = set() for line in content.split('\n'): if any([ line.startswith('WEBVTT'), line.startswith('Kind:'), line.startswith('Language:'), '-->' in line, line.strip().startswith('<'), not line.strip(), re.match(r'^\d+$', line.strip()) ]): continue clean = re.sub(r'<[^>]+>', '', line).strip() if clean and clean not in seen: seen.add(clean) lines.append(clean) return ' '.join(lines) if __name__ == '__main__': port = 8088 os.chdir(KANBAN_DIR) print(f"Starting Echo Task Board API on port {port}") httpd = HTTPServer(('0.0.0.0', port), TaskBoardHandler) httpd.serve_forever()