#!/usr/bin/env python3 """ šŸ”’ ROA2WEB Secrets Scanner Advanced secrets detection tool for preventing credential leaks in git repositories. Usage: python security/secrets_scanner.py [--scan-git-history] [--fix-gitignore] [--verbose] Features: - Scans current files for secrets and credentials - Optional git history scanning for historical leaks - Automated .gitignore fixes - Pattern-based detection with high accuracy - Integration ready for git hooks """ import os import re import sys import subprocess import argparse import json from pathlib import Path from typing import List, Dict, Set, Tuple from dataclasses import dataclass, asdict from datetime import datetime @dataclass class SecurityViolation: """Represents a detected security violation""" file_path: str line_number: int content: str pattern_name: str severity: str commit_hash: str = "" class SecretsScanner: """Advanced secrets detection scanner""" # Critical patterns for secrets detection CRITICAL_PATTERNS = { 'oracle_password': r'ORACLE_PASSWORD\s*=\s*[\'"]([^\'"\s]+)[\'"]', 'user_passwords': r'VALID_USERS\s*=\s*[\'"](\{[^}]*password[^}]*\})[\'"]', 'jwt_secret': r'JWT_SECRET[_KEY]*\s*=\s*[\'"]([^\'"\s]+)[\'"]', 'database_dsn': r'DSN\s*=\s*[\'"]([^\'"\s]+)[\'"]', 'api_key': r'API[_-]?KEY\s*=\s*[\'"]([^\'"\s]{20,})[\'"]', 'ssh_private_key': r'-----BEGIN [A-Z ]*PRIVATE KEY-----', 'aws_access_key': r'AKIA[0-9A-Z]{16}', 'generic_password': r'(?i)(password|passwd|pwd)\s*[:=]\s*[\'"]([^\'"\s]{4,})[\'"]', 'connection_string': r'(?i)(server|host|endpoint)=[^;]+;.*password=[^;]+', 'bearer_token': r'Bearer\s+[A-Za-z0-9\-._~+/]+=*', } # Suspicious file patterns SUSPICIOUS_FILES = { r'.*\.env(?!\.example)$': 'Environment file', r'.*_rsa$': 'SSH private key', r'.*\.pem$': 'PEM certificate/key', r'.*\.key$': 'Key file', r'.*secret.*': 'Secret file', r'.*credential.*': 'Credential file', r'.*password.*': 'Password file', r'.*config\.prod.*': 'Production config', } # Safe file extensions to skip SAFE_EXTENSIONS = { '.md', '.txt', '.rst', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico', '.mp4', '.avi', '.zip', '.tar', '.gz', '.json', '.xml', '.css', '.scss', '.less', '.html', '.js', '.ts' } def __init__(self, repo_path: str = "."): self.repo_path = Path(repo_path) self.violations: List[SecurityViolation] = [] self.scanned_files = 0 self.start_time = datetime.now() def scan_file_content(self, file_path: Path) -> List[SecurityViolation]: """Scan file content for secrets patterns""" violations = [] try: # Skip binary files and safe extensions if file_path.suffix.lower() in self.SAFE_EXTENSIONS: return violations with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() for line_num, line in enumerate(lines, 1): for pattern_name, pattern in self.CRITICAL_PATTERNS.items(): if re.search(pattern, line, re.IGNORECASE): violations.append(SecurityViolation( file_path=str(file_path.relative_to(self.repo_path)), line_number=line_num, content=line.strip()[:100] + "..." if len(line.strip()) > 100 else line.strip(), pattern_name=pattern_name, severity="CRITICAL" if pattern_name in ['oracle_password', 'user_passwords', 'ssh_private_key'] else "HIGH" )) except (UnicodeDecodeError, PermissionError, FileNotFoundError): pass # Skip files that can't be read return violations def scan_file_names(self) -> List[SecurityViolation]: """Scan for suspicious file names""" violations = [] for root, dirs, files in os.walk(self.repo_path): # Skip .git directory and other VCS dirs[:] = [d for d in dirs if not d.startswith('.git')] for file in files: file_path = Path(root) / file rel_path = file_path.relative_to(self.repo_path) for pattern, description in self.SUSPICIOUS_FILES.items(): if re.match(pattern, str(rel_path), re.IGNORECASE): violations.append(SecurityViolation( file_path=str(rel_path), line_number=0, content=f"Suspicious file: {description}", pattern_name="suspicious_filename", severity="HIGH" )) return violations def scan_current_files(self) -> None: """Scan all current files in repository""" print("šŸ” Scanning current files for secrets...") # Scan file names first self.violations.extend(self.scan_file_names()) # Scan file contents for root, dirs, files in os.walk(self.repo_path): # Skip .git and other VCS directories dirs[:] = [d for d in dirs if not d.startswith(('.git', '.svn', '.hg'))] for file in files: file_path = Path(root) / file self.violations.extend(self.scan_file_content(file_path)) self.scanned_files += 1 print(f"āœ… Scanned {self.scanned_files} files") def scan_git_history(self) -> None: """Scan git history for secrets (WARNING: can be slow on large repos)""" print("šŸ• Scanning git history for secrets...") try: # Get all commits result = subprocess.run( ['git', 'log', '--pretty=format:%H', '--all'], cwd=self.repo_path, capture_output=True, text=True, check=True ) commits = result.stdout.strip().split('\n')[:50] # Limit to recent 50 commits for commit in commits: if not commit: continue # Get diff for commit diff_result = subprocess.run( ['git', 'show', commit, '--pretty=format:', '--name-only'], cwd=self.repo_path, capture_output=True, text=True ) if diff_result.returncode == 0: # Check diff content content_result = subprocess.run( ['git', 'show', commit], cwd=self.repo_path, capture_output=True, text=True ) if content_result.returncode == 0: lines = content_result.stdout.split('\n') for line_num, line in enumerate(lines, 1): if line.startswith(('+', '-')): # Only check added/removed lines for pattern_name, pattern in self.CRITICAL_PATTERNS.items(): if re.search(pattern, line, re.IGNORECASE): self.violations.append(SecurityViolation( file_path="git_history", line_number=line_num, content=line[:100] + "..." if len(line) > 100 else line, pattern_name=pattern_name, severity="CRITICAL", commit_hash=commit )) except subprocess.CalledProcessError: print("āš ļø Could not scan git history (not a git repo or git not available)") def generate_report(self) -> Dict: """Generate comprehensive security report""" report = { 'scan_timestamp': self.start_time.isoformat(), 'repository_path': str(self.repo_path), 'summary': { 'total_violations': len(self.violations), 'critical_violations': len([v for v in self.violations if v.severity == "CRITICAL"]), 'high_violations': len([v for v in self.violations if v.severity == "HIGH"]), 'files_scanned': self.scanned_files }, 'violations_by_type': {}, 'violations': [asdict(v) for v in self.violations] } # Group violations by pattern for violation in self.violations: pattern = violation.pattern_name if pattern not in report['violations_by_type']: report['violations_by_type'][pattern] = 0 report['violations_by_type'][pattern] += 1 return report def print_report(self) -> None: """Print formatted security report""" report = self.generate_report() print("\n" + "="*80) print("šŸ”’ ROA2WEB SECURITY SCAN REPORT") print("="*80) print(f"šŸ“… Scan Date: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"šŸ“ Repository: {self.repo_path}") print(f"šŸ“Š Files Scanned: {self.scanned_files}") print("\nšŸ“ˆ SUMMARY:") print(f" 🚨 Total Violations: {report['summary']['total_violations']}") print(f" šŸ’€ Critical: {report['summary']['critical_violations']}") print(f" āš ļø High: {report['summary']['high_violations']}") if report['summary']['total_violations'] == 0: print("\nāœ… NO SECURITY VIOLATIONS FOUND!") return print(f"\nšŸ” VIOLATIONS BY PATTERN:") for pattern, count in report['violations_by_type'].items(): print(f" {pattern}: {count}") print(f"\nšŸ“‹ DETAILED VIOLATIONS:") print("-" * 80) # Group by severity critical = [v for v in self.violations if v.severity == "CRITICAL"] high = [v for v in self.violations if v.severity == "HIGH"] if critical: print("\nšŸ’€ CRITICAL VIOLATIONS:") for v in critical: print(f" File: {v.file_path}:{v.line_number}") print(f" Type: {v.pattern_name}") print(f" Content: {v.content}") if v.commit_hash: print(f" Commit: {v.commit_hash}") print() if high: print("\nāš ļø HIGH VIOLATIONS:") for v in high: print(f" File: {v.file_path}:{v.line_number}") print(f" Type: {v.pattern_name}") print(f" Content: {v.content}") if v.commit_hash: print(f" Commit: {v.commit_hash}") print() def save_report(self, output_file: str = "security_scan_report.json") -> None: """Save report to JSON file""" report = self.generate_report() with open(output_file, 'w') as f: json.dump(report, f, indent=2) print(f"šŸ’¾ Report saved to: {output_file}") def main(): parser = argparse.ArgumentParser(description="ROA2WEB Secrets Scanner") parser.add_argument('--scan-git-history', action='store_true', help='Scan git history for secrets (slow)') parser.add_argument('--save-report', metavar='FILE', help='Save report to JSON file') parser.add_argument('--repo-path', default='.', help='Repository path to scan') parser.add_argument('--verbose', action='store_true', help='Verbose output') args = parser.parse_args() scanner = SecretsScanner(args.repo_path) # Scan current files scanner.scan_current_files() # Optionally scan git history if args.scan_git_history: scanner.scan_git_history() # Print report scanner.print_report() # Save report if requested if args.save_report: scanner.save_report(args.save_report) # Exit with error code if violations found critical_count = len([v for v in scanner.violations if v.severity == "CRITICAL"]) if critical_count > 0: print(f"\nāŒ CRITICAL VIOLATIONS FOUND: {critical_count}") print("šŸ”§ Action Required: Remove secrets and regenerate credentials!") sys.exit(1) elif len(scanner.violations) > 0: print(f"\nāš ļø SECURITY WARNINGS: {len(scanner.violations)}") print("šŸ”§ Recommended: Review and fix violations") sys.exit(2) else: print("\nāœ… Security scan passed!") sys.exit(0) if __name__ == "__main__": main()