"""Git status / diff / commit handlers for dashboard + workspace projects.""" import json import subprocess import urllib.error import urllib.request from datetime import datetime from urllib.parse import parse_qs, urlparse import constants class GitHandlers: """Mixin providing git status/diff/commit endpoints.""" # ── shared helper ──────────────────────────────────────────── def _run_git(self, workspace, args, timeout=5): """Run a git command in workspace. Returns CompletedProcess.""" return subprocess.run( ['git', *args], cwd=str(workspace), capture_output=True, text=True, timeout=timeout, ) # ── /api/git (dashboard repo) ─────────────────────────────── def handle_git_status(self): """Get git status for the echo-core repo.""" try: workspace = constants.GIT_WORKSPACE branch = self._run_git(workspace, ['branch', '--show-current']).stdout.strip() last_commit = self._run_git(workspace, ['log', '-1', '--format=%h|%s|%cr']).stdout.strip() commit_parts = last_commit.split('|') if last_commit else ['', '', ''] status_output = self._run_git(workspace, ['status', '--short']).stdout.strip() uncommitted = [f for f in status_output.split('\n') if f.strip()] if status_output else [] diff_stat = '' if uncommitted: diff_stat = self._run_git(workspace, ['diff', '--stat', '--cached']).stdout.strip() if not diff_stat: diff_stat = self._run_git(workspace, ['diff', '--stat']).stdout.strip() uncommitted_parsed = [] for line in uncommitted: if len(line) >= 2: status = line[:2].strip() filepath = line[2:].strip() if filepath: uncommitted_parsed.append({'status': status, 'path': filepath}) self.send_json({ 'branch': branch, 'lastCommit': { 'hash': commit_parts[0] if len(commit_parts) > 0 else '', 'message': commit_parts[1] if len(commit_parts) > 1 else '', 'time': commit_parts[2] if len(commit_parts) > 2 else '', }, 'uncommitted': uncommitted, 'uncommittedParsed': uncommitted_parsed, 'uncommittedCount': len(uncommitted), 'diffStat': diff_stat, 'clean': len(uncommitted) == 0, }) except Exception as e: self.send_json({'error': str(e)}, 500) # ── /api/diff ──────────────────────────────────────────────── def handle_git_diff(self): """Get git diff for a specific file.""" params = parse_qs(urlparse(self.path).query) filepath = params.get('path', [''])[0] if not filepath: self.send_json({'error': 'path required'}, 400) return try: workspace = constants.GIT_WORKSPACE target = (workspace / filepath).resolve() if not str(target).startswith(str(workspace)): self.send_json({'error': 'Access denied'}, 403) return diff = self._run_git(workspace, ['diff', '--cached', '--', filepath], timeout=10).stdout if not diff: diff = self._run_git(workspace, ['diff', '--', filepath], timeout=10).stdout if not diff: status = self._run_git(workspace, ['status', '--short', '--', filepath]).stdout.strip() if status.startswith('??') and target.exists(): content = target.read_text(encoding='utf-8', errors='replace')[:50000] diff = f"+++ b/{filepath}\n" + '\n'.join(f'+{line}' for line in content.split('\n')) self.send_json({ 'path': filepath, 'diff': diff or 'No changes', 'hasDiff': bool(diff), }) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_eco_git_commit(self): """Run git add, commit, and push for echo-core repo.""" try: workspace = constants.ECHO_CORE_DIR self._run_git(workspace, ['add', '-A'], timeout=10) status = self._run_git(workspace, ['status', '--porcelain']).stdout.strip() if not status: self.send_json({'success': True, 'files': 0, 'output': 'Nothing to commit'}) return files_count = len([l for l in status.split('\n') if l.strip()]) commit_result = self._run_git(workspace, ['commit', '-m', 'chore: auto-commit from dashboard'], timeout=30) push_result = self._run_git(workspace, ['push'], timeout=30) output = commit_result.stdout + commit_result.stderr + push_result.stdout + push_result.stderr if commit_result.returncode == 0: self.send_json({'success': True, 'files': files_count, 'output': output}) else: self.send_json({'success': False, 'error': output or 'Commit failed'}) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) # ── /api/workspace/git/* (per-project) ─────────────────────── def handle_workspace_git_diff(self): """Get git diff for a workspace project.""" try: params = parse_qs(urlparse(self.path).query) project_name = params.get('project', [''])[0] project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'error': 'Invalid project'}, 400) return if not (project_dir / '.git').exists(): self.send_json({'error': 'Not a git repository'}, 400) return status = self._run_git(project_dir, ['status', '--short'], timeout=10).stdout.strip() diff = self._run_git(project_dir, ['diff'], timeout=10).stdout diff_cached = self._run_git(project_dir, ['diff', '--cached'], timeout=10).stdout combined_diff = '' if diff_cached: combined_diff += '=== Staged Changes ===\n' + diff_cached if diff: if combined_diff: combined_diff += '\n' combined_diff += '=== Unstaged Changes ===\n' + diff self.send_json({ 'project': project_name, 'status': status, 'diff': combined_diff, 'hasDiff': bool(status), }) except subprocess.TimeoutExpired: self.send_json({'error': 'Timeout'}, 500) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_workspace_git_commit(self): """Commit all changes in a workspace project.""" try: data = self._read_post_json() project_name = data.get('project', '') message = data.get('message', '').strip() project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return if not (project_dir / '.git').exists(): self.send_json({'success': False, 'error': 'Not a git repository'}, 400) return porcelain = self._run_git(project_dir, ['status', '--porcelain'], timeout=10).stdout.strip() if not porcelain: self.send_json({'success': False, 'error': 'Nothing to commit'}) return files_changed = len([l for l in porcelain.split('\n') if l.strip()]) if not message: now = datetime.now().strftime('%Y-%m-%d %H:%M') message = f'Update: {now} ({files_changed} files)' self._run_git(project_dir, ['add', '-A'], timeout=10) result = self._run_git(project_dir, ['commit', '-m', message], timeout=30) output = result.stdout + result.stderr if result.returncode == 0: self.send_json({ 'success': True, 'message': message, 'output': output, 'filesChanged': files_changed, }) else: self.send_json({'success': False, 'error': output or 'Commit failed'}) except subprocess.TimeoutExpired: self.send_json({'success': False, 'error': 'Timeout'}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500) def _ensure_gitea_remote(self, project_dir, project_name): """Create Gitea repo and add remote if no origin exists. Returns (ok, message).""" if not constants.GITEA_TOKEN: return False, 'GITEA_TOKEN not set' api_url = f'{constants.GITEA_URL}/api/v1/orgs/{constants.GITEA_ORG}/repos' payload = json.dumps({'name': project_name, 'private': True, 'auto_init': False}).encode() req = urllib.request.Request(api_url, data=payload, method='POST', headers={ 'Authorization': f'token {constants.GITEA_TOKEN}', 'Content-Type': 'application/json', }) try: resp = urllib.request.urlopen(req, timeout=15) resp.read() except urllib.error.HTTPError as e: body = e.read().decode(errors='replace') if e.code == 409: pass # repo already exists — fine else: return False, f'Gitea API error {e.code}: {body}' remote_url = f'{constants.GITEA_URL}/{constants.GITEA_ORG}/{project_name}.git' auth_url = remote_url.replace('https://', f'https://gitea:{constants.GITEA_TOKEN}@') subprocess.run( ['git', 'remote', 'add', 'origin', auth_url], cwd=str(project_dir), capture_output=True, text=True, timeout=5, ) return True, f'Created repo {constants.GITEA_ORG}/{project_name}' def handle_workspace_git_push(self): """Push a workspace project to its remote, creating Gitea repo if needed.""" try: data = self._read_post_json() project_name = data.get('project', '') project_dir = self._validate_project(project_name) if not project_dir: self.send_json({'success': False, 'error': 'Invalid project'}, 400) return if not (project_dir / '.git').exists(): self.send_json({'success': False, 'error': 'Not a git repository'}, 400) return created_msg = '' remote_check = self._run_git(project_dir, ['remote', 'get-url', 'origin'], timeout=10) if remote_check.returncode != 0: ok, msg = self._ensure_gitea_remote(project_dir, project_name) if not ok: self.send_json({'success': False, 'error': msg}) return created_msg = msg + '\n' result = self._run_git(project_dir, ['push', '-u', 'origin', 'HEAD'], timeout=60) output = result.stdout + result.stderr if result.returncode == 0: self.send_json({'success': True, 'output': created_msg + (output or 'Pushed successfully')}) else: self.send_json({'success': False, 'error': output or 'Push failed'}) except subprocess.TimeoutExpired: self.send_json({'success': False, 'error': 'Push timeout (60s)'}, 500) except Exception as e: self.send_json({'success': False, 'error': str(e)}, 500)