#!/usr/bin/env python3 """ ANAF Monitor v2.2 - Hash detection + version extraction + text diff - Hash-based change detection (catches ANY change) - Extracts ALL soft A/J versions from page - Saves page text and shows diff on changes """ import json import re import hashlib import urllib.request import ssl import difflib from datetime import datetime from pathlib import Path from html.parser import HTMLParser SCRIPT_DIR = Path(__file__).parent CONFIG_FILE = SCRIPT_DIR / "config.json" VERSIONS_FILE = SCRIPT_DIR / "versions.json" HASHES_FILE = SCRIPT_DIR / "hashes.json" SNAPSHOTS_DIR = SCRIPT_DIR / "snapshots" LOG_FILE = SCRIPT_DIR / "monitor.log" DASHBOARD_STATUS = SCRIPT_DIR.parent.parent / "dashboard" / "status.json" # Ensure snapshots directory exists SNAPSHOTS_DIR.mkdir(exist_ok=True) class TextExtractor(HTMLParser): """Extract visible text from HTML""" def __init__(self): super().__init__() self.text = [] self.skip_tags = {'script', 'style', 'head', 'meta', 'link'} self.current_tag = None def handle_starttag(self, tag, attrs): self.current_tag = tag.lower() def handle_endtag(self, tag): self.current_tag = None def handle_data(self, data): if self.current_tag not in self.skip_tags: text = data.strip() if text: self.text.append(text) def get_text(self): return '\n'.join(self.text) def html_to_text(html): """Convert HTML to plain text""" parser = TextExtractor() try: parser.feed(html) return parser.get_text() except: # Fallback: just strip tags return re.sub(r'<[^>]+>', ' ', html) SSL_CTX = ssl.create_default_context() SSL_CTX.check_hostname = False SSL_CTX.verify_mode = ssl.CERT_NONE def log(msg): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(LOG_FILE, "a") as f: f.write(f"[{timestamp}] {msg}\n") def load_json(path, default=None): try: with open(path) as f: return json.load(f) except: return default if default is not None else {} def save_json(path, data): with open(path, "w") as f: json.dump(data, f, indent=2, ensure_ascii=False) def fetch_page(url, timeout=30): try: req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (compatible; ANAF-Monitor/2.1)' }) with urllib.request.urlopen(req, timeout=timeout, context=SSL_CTX) as resp: return resp.read() except Exception as e: log(f"ERROR fetching {url}: {e}") return None def compute_hash(content): """Compute SHA256 hash of content""" return hashlib.sha256(content).hexdigest() def load_snapshot(page_id): """Load previous page text snapshot""" snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt" try: return snapshot_file.read_text(encoding='utf-8') except: return None def save_snapshot(page_id, text): """Save page text snapshot""" snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt" snapshot_file.write_text(text, encoding='utf-8') def generate_diff(old_text, new_text, context_lines=3): """Generate unified diff between old and new text""" if not old_text: return None old_lines = old_text.splitlines(keepends=True) new_lines = new_text.splitlines(keepends=True) diff = list(difflib.unified_diff( old_lines, new_lines, fromfile='anterior', tofile='actual', n=context_lines )) if not diff: return None # Limitează diff-ul la maxim 50 linii pentru output if len(diff) > 50: diff = diff[:50] + ['... (truncat)\n'] return ''.join(diff) def parse_date_from_filename(filename): """Extrage data din numele fișierului (ex: D394_26092025.pdf -> 26.09.2025)""" # Pattern: _DDMMYYYY. sau _DDMMYYYY_ sau _YYYYMMDD match = re.search(r'_(\d{8})[\._]', filename) if match: d = match.group(1) # Verifică dacă e DDMMYYYY sau YYYYMMDD if int(d[:2]) <= 31 and int(d[2:4]) <= 12: return f"{d[:2]}.{d[2:4]}.{d[4:]}" elif int(d[4:6]) <= 12 and int(d[6:]) <= 31: return f"{d[6:]}.{d[4:6]}.{d[:4]}" # Pattern: _DDMMYY match = re.search(r'_(\d{6})[\._]', filename) if match: d = match.group(1) if int(d[:2]) <= 31 and int(d[2:4]) <= 12: return f"{d[:2]}.{d[2:4]}.20{d[4:]}" return None def extract_versions(html): """Extrage soft A/J din HTML - primul generic + toate cele cu label (S1002, etc.)""" versions = {} # Găsește PRIMUL link soft A (PDF) - versiunea curentă soft_a_match = re.search( r']+href=["\']([^"\']*\.pdf)["\'][^>]*>\s*soft\s*A\s*', html, re.IGNORECASE ) if soft_a_match: url = soft_a_match.group(1) versions['soft_a_url'] = url date = parse_date_from_filename(url) if date: versions['soft_a_date'] = date # Găsește soft J-uri CU LABEL (ex: "soft J - S1002") - toate soft_j_labeled = re.findall( r']+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J\s*-\s*([^<]+)', html, re.IGNORECASE ) if soft_j_labeled: # Pagină cu soft-uri denumite (bilanț) for url, label in soft_j_labeled: label = label.strip() key = f'soft_j_{label.replace(" ", "_")}' versions[f'{key}_url'] = url date = parse_date_from_filename(url) if date: versions[f'{key}_date'] = date else: # Pagină cu soft J simplu - ia doar primul soft_j_match = re.search( r']+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J', html, re.IGNORECASE ) if soft_j_match: url = soft_j_match.group(1) versions['soft_j_url'] = url date = parse_date_from_filename(url) if date: versions['soft_j_date'] = date # Găsește data publicării din text publish_match = re.search( r'publicat\s+[îi]n\s*(?:data\s+de\s*)?(\d{2}[./]\d{2}[./]\d{4})', html, re.IGNORECASE ) if publish_match: versions['published'] = publish_match.group(1).replace('/', '.') return versions def compare_versions(old, new): """Compară versiunile și returnează diferențele""" changes = [] # Colectează toate cheile unice all_keys = set(old.keys()) | set(new.keys()) date_keys = sorted([k for k in all_keys if k.endswith('_date') or k == 'published']) for key in date_keys: old_val = old.get(key) new_val = new.get(key) # Formatează label-ul label = key.replace('_date', '').replace('_', ' ').title() if new_val and old_val != new_val: if old_val: changes.append(f"{label}: {old_val} → {new_val}") else: changes.append(f"{label}: {new_val} (NOU)") return changes def format_current_versions(versions): """Formatează versiunile curente pentru output""" result = {} for key, val in versions.items(): if key.endswith('_date'): label = key.replace('_date', '') result[label] = val return result def check_page(page, saved_versions, saved_hashes): """Verifică o pagină și returnează modificările""" page_id = page["id"] name = page["name"] url = page["url"] content = fetch_page(url) if content is None: return None # 1. Verifică hash-ul mai întâi (detectează ORICE schimbare) new_hash = compute_hash(content) old_hash = saved_hashes.get(page_id) html = content.decode('utf-8', errors='ignore') new_text = html_to_text(html) new_versions = extract_versions(html) old_versions = saved_versions.get(page_id, {}) # Încarcă snapshot-ul anterior old_text = load_snapshot(page_id) # Prima rulare - inițializare if not old_hash: log(f"INIT: {page_id}") saved_hashes[page_id] = new_hash saved_versions[page_id] = new_versions save_snapshot(page_id, new_text) return None # Compară hash-uri hash_changed = new_hash != old_hash # Compară versiuni pentru detalii version_changes = compare_versions(old_versions, new_versions) # Generează diff dacă s-a schimbat diff = None if hash_changed and old_text: diff = generate_diff(old_text, new_text) # Actualizează starea saved_hashes[page_id] = new_hash saved_versions[page_id] = new_versions save_snapshot(page_id, new_text) if hash_changed: if version_changes: log(f"CHANGES in {page_id}: {version_changes}") else: log(f"HASH CHANGED in {page_id} (no version changes detected)") version_changes = ["Pagina s-a modificat (vezi diff)"] result = { "id": page_id, "name": name, "url": url, "changes": version_changes, "current": format_current_versions(new_versions) } if diff: result["diff"] = diff return result log(f"OK: {page_id}") return None def update_dashboard_status(has_changes, changes_count): """Actualizează status.json pentru dashboard""" try: status = load_json(DASHBOARD_STATUS, {}) status['anaf'] = { 'ok': not has_changes, 'status': 'MODIFICĂRI' if has_changes else 'OK', 'message': f'{changes_count} modificări detectate' if has_changes else 'Nicio modificare detectată', 'lastCheck': datetime.now().strftime('%d %b %Y, %H:%M'), 'changesCount': changes_count } save_json(DASHBOARD_STATUS, status) except Exception as e: log(f"ERROR updating dashboard status: {e}") def main(): log("=== Starting ANAF monitor v2.1 ===") config = load_json(CONFIG_FILE, {"pages": []}) saved_versions = load_json(VERSIONS_FILE, {}) saved_hashes = load_json(HASHES_FILE, {}) all_changes = [] for page in config["pages"]: result = check_page(page, saved_versions, saved_hashes) if result: all_changes.append(result) save_json(VERSIONS_FILE, saved_versions) save_json(HASHES_FILE, saved_hashes) # Update dashboard status update_dashboard_status(len(all_changes) > 0, len(all_changes)) log("=== Monitor complete ===") print(json.dumps({"changes": all_changes}, ensure_ascii=False, indent=2)) return len(all_changes) if __name__ == "__main__": exit(main())