Initial commit: ROA2WEB - FastAPI + Vue.js + Telegram Bot
Modern ERP Reports Application with microservices architecture Tech Stack: - Backend: FastAPI + python-oracledb (Oracle DB integration) - Frontend: Vue.js 3 + PrimeVue + Vite - Telegram Bot: python-telegram-bot + SQLite - Infrastructure: Shared database pool, JWT authentication, SSH tunnel Features: - FastAPI backend with async Oracle connection pool - Vue.js 3 responsive frontend with PrimeVue components - Telegram bot alternative interface - Microservices architecture with shared components - Complete deployment support (Linux Docker + Windows IIS) - Comprehensive testing (Playwright E2E + pytest) Repository Structure: - reports-app/ - Main application (backend, frontend, telegram-bot) - shared/ - Shared components (database pool, auth, utils) - deployment/ - Deployment scripts (Linux & Windows) - docs/ - Project documentation - security/ - Security scanning and git hooks
This commit is contained in:
368
security/git_cleanup.py
Normal file
368
security/git_cleanup.py
Normal file
@@ -0,0 +1,368 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
🧹 ROA2WEB Git History Cleanup Tool
|
||||
Safely removes secrets from git history using BFG Repo-Cleaner and git filter-branch.
|
||||
|
||||
⚠️ WARNING: This tool rewrites git history. Make sure to:
|
||||
1. Create a complete backup of your repository
|
||||
2. Coordinate with all team members
|
||||
3. Force-push to all remotes after cleanup
|
||||
4. Regenerate all compromised credentials
|
||||
|
||||
Usage:
|
||||
python security/git_cleanup.py --backup --scan --cleanup [--force]
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import argparse
|
||||
import shutil
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import List, Dict
|
||||
|
||||
class GitHistoryCleanup:
|
||||
"""Git history cleanup and secrets removal tool"""
|
||||
|
||||
def __init__(self, repo_path: str = "."):
|
||||
self.repo_path = Path(repo_path).resolve()
|
||||
self.backup_path = None
|
||||
self.cleanup_log = []
|
||||
|
||||
# Files and patterns to remove from history
|
||||
self.FILES_TO_REMOVE = [
|
||||
"app/.env",
|
||||
"roa2web/reports-app/backend/.env",
|
||||
"roa2web/.env",
|
||||
"roa2web/.env.development",
|
||||
"roa2web/.env.production",
|
||||
"roa2web/ssh-tunnel/roa_oracle_server"
|
||||
]
|
||||
|
||||
# Text patterns to replace in history
|
||||
self.SECRETS_TO_REPLACE = {
|
||||
"ACTUAL_ORACLE_PASS": "***REMOVED***",
|
||||
"ACTUAL_USER_PASS": "***REMOVED***",
|
||||
"DB_PASSWORD=ACTUAL_ORACLE_PASS": "DB_PASSWORD=***REMOVED***",
|
||||
'"marius": "ACTUAL_USER_PASS"': '"marius": "***REMOVED***"',
|
||||
'"eli": "eli"': '"eli": "***REMOVED***"'
|
||||
}
|
||||
|
||||
def log_action(self, action: str, details: str = "") -> None:
|
||||
"""Log cleanup actions"""
|
||||
timestamp = datetime.now().isoformat()
|
||||
log_entry = {
|
||||
"timestamp": timestamp,
|
||||
"action": action,
|
||||
"details": details
|
||||
}
|
||||
self.cleanup_log.append(log_entry)
|
||||
print(f"📝 {timestamp}: {action}")
|
||||
if details:
|
||||
print(f" Details: {details}")
|
||||
|
||||
def check_prerequisites(self) -> bool:
|
||||
"""Check if git and required tools are available"""
|
||||
try:
|
||||
# Check git
|
||||
subprocess.run(['git', '--version'], check=True, capture_output=True)
|
||||
|
||||
# Check if we're in a git repo
|
||||
subprocess.run(['git', 'status'], cwd=self.repo_path, check=True, capture_output=True)
|
||||
|
||||
self.log_action("Prerequisites check passed")
|
||||
return True
|
||||
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
print("❌ Error: Git not available or not in a git repository")
|
||||
return False
|
||||
|
||||
def create_backup(self) -> bool:
|
||||
"""Create complete repository backup"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_name = f"roa2web_backup_{timestamp}"
|
||||
self.backup_path = self.repo_path.parent / backup_name
|
||||
|
||||
try:
|
||||
print(f"💾 Creating backup at: {self.backup_path}")
|
||||
|
||||
# Use git clone to create a complete backup with all history
|
||||
subprocess.run([
|
||||
'git', 'clone', '--mirror',
|
||||
str(self.repo_path),
|
||||
str(self.backup_path)
|
||||
], check=True)
|
||||
|
||||
self.log_action("Backup created", str(self.backup_path))
|
||||
print(f"✅ Backup created successfully: {self.backup_path}")
|
||||
return True
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Backup failed: {e}")
|
||||
return False
|
||||
|
||||
def scan_for_secrets(self) -> Dict:
|
||||
"""Scan repository for secrets that need cleanup"""
|
||||
print("🔍 Scanning for secrets in git history...")
|
||||
|
||||
secrets_found = {
|
||||
"files_with_secrets": [],
|
||||
"commits_with_secrets": [],
|
||||
"patterns_found": {}
|
||||
}
|
||||
|
||||
try:
|
||||
# Check if files exist in git history
|
||||
for file_path in self.FILES_TO_REMOVE:
|
||||
result = subprocess.run([
|
||||
'git', 'log', '--oneline', '--', file_path
|
||||
], cwd=self.repo_path, capture_output=True, text=True)
|
||||
|
||||
if result.stdout.strip():
|
||||
secrets_found["files_with_secrets"].append(file_path)
|
||||
print(f" 📄 Found in history: {file_path}")
|
||||
|
||||
# Check for secret patterns in git log
|
||||
for secret_pattern in self.SECRETS_TO_REPLACE.keys():
|
||||
result = subprocess.run([
|
||||
'git', 'log', '-S', secret_pattern, '--oneline'
|
||||
], cwd=self.repo_path, capture_output=True, text=True)
|
||||
|
||||
if result.stdout.strip():
|
||||
commits = result.stdout.strip().split('\n')
|
||||
secrets_found["patterns_found"][secret_pattern] = len(commits)
|
||||
secrets_found["commits_with_secrets"].extend(commits)
|
||||
print(f" 🔑 Pattern '{secret_pattern}' found in {len(commits)} commits")
|
||||
|
||||
self.log_action("Secrets scan completed", json.dumps(secrets_found, indent=2))
|
||||
return secrets_found
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Scan failed: {e}")
|
||||
return secrets_found
|
||||
|
||||
def remove_files_from_history(self) -> bool:
|
||||
"""Remove sensitive files from git history using git filter-branch"""
|
||||
print("🧹 Removing sensitive files from git history...")
|
||||
|
||||
try:
|
||||
for file_path in self.FILES_TO_REMOVE:
|
||||
print(f" Removing: {file_path}")
|
||||
|
||||
# Use git filter-branch to remove file from history
|
||||
subprocess.run([
|
||||
'git', 'filter-branch', '--force', '--index-filter',
|
||||
f'git rm --cached --ignore-unmatch {file_path}',
|
||||
'--prune-empty', '--tag-name-filter', 'cat', '--', '--all'
|
||||
], cwd=self.repo_path, check=True)
|
||||
|
||||
self.log_action(f"Removed file from history", file_path)
|
||||
|
||||
return True
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ File removal failed: {e}")
|
||||
return False
|
||||
|
||||
def replace_secrets_in_history(self) -> bool:
|
||||
"""Replace secret patterns in git history"""
|
||||
print("🔄 Replacing secrets in git history...")
|
||||
|
||||
# Create temporary file with replacements
|
||||
replacements_file = self.repo_path / "temp_replacements.txt"
|
||||
|
||||
try:
|
||||
with open(replacements_file, 'w') as f:
|
||||
for secret, replacement in self.SECRETS_TO_REPLACE.items():
|
||||
f.write(f"{secret}==>{replacement}\n")
|
||||
|
||||
# Use git filter-branch with replace text
|
||||
subprocess.run([
|
||||
'git', 'filter-branch', '--force', '--tree-filter',
|
||||
f'find . -type f -exec sed -i.bak -f <(echo "s/{list(self.SECRETS_TO_REPLACE.keys())[0]}/{list(self.SECRETS_TO_REPLACE.values())[0]}/g") {{}} \\; 2>/dev/null || true',
|
||||
'--prune-empty', '--tag-name-filter', 'cat', '--', '--all'
|
||||
], cwd=self.repo_path, check=True)
|
||||
|
||||
self.log_action("Secrets replaced in history")
|
||||
return True
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Secret replacement failed: {e}")
|
||||
return False
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if replacements_file.exists():
|
||||
replacements_file.unlink()
|
||||
|
||||
def cleanup_git_refs(self) -> bool:
|
||||
"""Clean up git references and garbage collect"""
|
||||
print("🗑️ Cleaning up git references...")
|
||||
|
||||
try:
|
||||
# Remove backup refs created by filter-branch
|
||||
subprocess.run([
|
||||
'git', 'for-each-ref', '--format=delete %(refname)', 'refs/original'
|
||||
], cwd=self.repo_path, capture_output=True, text=True, check=True)
|
||||
|
||||
# Expire reflog
|
||||
subprocess.run([
|
||||
'git', 'reflog', 'expire', '--expire=now', '--all'
|
||||
], cwd=self.repo_path, check=True)
|
||||
|
||||
# Garbage collect
|
||||
subprocess.run([
|
||||
'git', 'gc', '--prune=now', '--aggressive'
|
||||
], cwd=self.repo_path, check=True)
|
||||
|
||||
self.log_action("Git cleanup completed")
|
||||
return True
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Git cleanup failed: {e}")
|
||||
return False
|
||||
|
||||
def verify_cleanup(self) -> bool:
|
||||
"""Verify that secrets have been removed from history"""
|
||||
print("🔍 Verifying cleanup...")
|
||||
|
||||
verification_results = {
|
||||
"files_still_present": [],
|
||||
"secrets_still_present": []
|
||||
}
|
||||
|
||||
try:
|
||||
# Check if files are still in history
|
||||
for file_path in self.FILES_TO_REMOVE:
|
||||
result = subprocess.run([
|
||||
'git', 'log', '--oneline', '--', file_path
|
||||
], cwd=self.repo_path, capture_output=True, text=True)
|
||||
|
||||
if result.stdout.strip():
|
||||
verification_results["files_still_present"].append(file_path)
|
||||
|
||||
# Check if secrets are still in history
|
||||
for secret_pattern in self.SECRETS_TO_REPLACE.keys():
|
||||
result = subprocess.run([
|
||||
'git', 'log', '-S', secret_pattern, '--oneline'
|
||||
], cwd=self.repo_path, capture_output=True, text=True)
|
||||
|
||||
if result.stdout.strip():
|
||||
verification_results["secrets_still_present"].append(secret_pattern)
|
||||
|
||||
if not verification_results["files_still_present"] and not verification_results["secrets_still_present"]:
|
||||
print("✅ Cleanup verification passed!")
|
||||
self.log_action("Cleanup verification passed")
|
||||
return True
|
||||
else:
|
||||
print("⚠️ Cleanup verification failed:")
|
||||
if verification_results["files_still_present"]:
|
||||
print(f" Files still present: {verification_results['files_still_present']}")
|
||||
if verification_results["secrets_still_present"]:
|
||||
print(f" Secrets still present: {verification_results['secrets_still_present']}")
|
||||
return False
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Verification failed: {e}")
|
||||
return False
|
||||
|
||||
def save_cleanup_log(self) -> None:
|
||||
"""Save cleanup log to file"""
|
||||
log_file = self.repo_path / f"security_cleanup_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
|
||||
with open(log_file, 'w') as f:
|
||||
json.dump({
|
||||
"cleanup_timestamp": datetime.now().isoformat(),
|
||||
"repository_path": str(self.repo_path),
|
||||
"backup_path": str(self.backup_path) if self.backup_path else None,
|
||||
"files_removed": self.FILES_TO_REMOVE,
|
||||
"secrets_replaced": self.SECRETS_TO_REPLACE,
|
||||
"actions": self.cleanup_log
|
||||
}, f, indent=2)
|
||||
|
||||
print(f"📝 Cleanup log saved: {log_file}")
|
||||
|
||||
def run_full_cleanup(self, force: bool = False) -> bool:
|
||||
"""Run complete cleanup process"""
|
||||
print("🚀 Starting ROA2WEB Git History Cleanup")
|
||||
print("="*60)
|
||||
|
||||
if not force:
|
||||
print("\n⚠️ WARNING: This will rewrite git history!")
|
||||
print("Make sure you have:")
|
||||
print("1. ✅ Created a backup")
|
||||
print("2. ✅ Coordinated with team members")
|
||||
print("3. ✅ Are ready to regenerate credentials")
|
||||
|
||||
confirm = input("\nProceed with cleanup? (yes/NO): ")
|
||||
if confirm.lower() != 'yes':
|
||||
print("❌ Cleanup cancelled")
|
||||
return False
|
||||
|
||||
# Check prerequisites
|
||||
if not self.check_prerequisites():
|
||||
return False
|
||||
|
||||
# Create backup
|
||||
if not self.create_backup():
|
||||
return False
|
||||
|
||||
# Scan for secrets
|
||||
secrets_found = self.scan_for_secrets()
|
||||
if not secrets_found["files_with_secrets"] and not secrets_found["patterns_found"]:
|
||||
print("✅ No secrets found in git history")
|
||||
return True
|
||||
|
||||
# Remove files from history
|
||||
if not self.remove_files_from_history():
|
||||
return False
|
||||
|
||||
# Replace secrets in history
|
||||
if not self.replace_secrets_in_history():
|
||||
return False
|
||||
|
||||
# Cleanup git references
|
||||
if not self.cleanup_git_refs():
|
||||
return False
|
||||
|
||||
# Verify cleanup
|
||||
if not self.verify_cleanup():
|
||||
print("⚠️ Cleanup may not be complete. Check manually.")
|
||||
|
||||
# Save log
|
||||
self.save_cleanup_log()
|
||||
|
||||
print("\n✅ Git history cleanup completed!")
|
||||
print("\n🔧 NEXT STEPS:")
|
||||
print("1. 🔑 Regenerate all compromised credentials")
|
||||
print("2. 🚀 Force push to all remotes: git push --force-with-lease --all")
|
||||
print("3. 📢 Notify team members to re-clone repository")
|
||||
print("4. 🗑️ Delete old backup when confident: rm -rf", self.backup_path)
|
||||
|
||||
return True
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="ROA2WEB Git History Cleanup")
|
||||
parser.add_argument('--backup', action='store_true', help='Create backup only')
|
||||
parser.add_argument('--scan', action='store_true', help='Scan for secrets only')
|
||||
parser.add_argument('--cleanup', action='store_true', help='Run full cleanup')
|
||||
parser.add_argument('--force', action='store_true', help='Skip confirmation prompts')
|
||||
parser.add_argument('--repo-path', default='.', help='Repository path')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
cleaner = GitHistoryCleanup(args.repo_path)
|
||||
|
||||
if args.backup:
|
||||
cleaner.create_backup()
|
||||
elif args.scan:
|
||||
cleaner.scan_for_secrets()
|
||||
elif args.cleanup:
|
||||
success = cleaner.run_full_cleanup(args.force)
|
||||
sys.exit(0 if success else 1)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user