Files
clawd/tools/anaf-monitor/monitor_v2.py

357 lines
11 KiB
Python

#!/usr/bin/env python3
"""
ANAF Monitor v2.2 - Hash detection + version extraction + text diff
- Hash-based change detection (catches ANY change)
- Extracts ALL soft A/J versions from page
- Saves page text and shows diff on changes
"""
import json
import re
import hashlib
import urllib.request
import ssl
import difflib
from datetime import datetime
from pathlib import Path
from html.parser import HTMLParser
SCRIPT_DIR = Path(__file__).parent
CONFIG_FILE = SCRIPT_DIR / "config.json"
VERSIONS_FILE = SCRIPT_DIR / "versions.json"
HASHES_FILE = SCRIPT_DIR / "hashes.json"
SNAPSHOTS_DIR = SCRIPT_DIR / "snapshots"
LOG_FILE = SCRIPT_DIR / "monitor.log"
DASHBOARD_STATUS = SCRIPT_DIR.parent.parent / "dashboard" / "status.json"
# Ensure snapshots directory exists
SNAPSHOTS_DIR.mkdir(exist_ok=True)
class TextExtractor(HTMLParser):
"""Extract visible text from HTML"""
def __init__(self):
super().__init__()
self.text = []
self.skip_tags = {'script', 'style', 'head', 'meta', 'link'}
self.current_tag = None
def handle_starttag(self, tag, attrs):
self.current_tag = tag.lower()
def handle_endtag(self, tag):
self.current_tag = None
def handle_data(self, data):
if self.current_tag not in self.skip_tags:
text = data.strip()
if text:
self.text.append(text)
def get_text(self):
return '\n'.join(self.text)
def html_to_text(html):
"""Convert HTML to plain text"""
parser = TextExtractor()
try:
parser.feed(html)
return parser.get_text()
except:
# Fallback: just strip tags
return re.sub(r'<[^>]+>', ' ', html)
SSL_CTX = ssl.create_default_context()
SSL_CTX.check_hostname = False
SSL_CTX.verify_mode = ssl.CERT_NONE
def log(msg):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a") as f:
f.write(f"[{timestamp}] {msg}\n")
def load_json(path, default=None):
try:
with open(path) as f:
return json.load(f)
except:
return default if default is not None else {}
def save_json(path, data):
with open(path, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def fetch_page(url, timeout=30):
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0 (compatible; ANAF-Monitor/2.1)'
})
with urllib.request.urlopen(req, timeout=timeout, context=SSL_CTX) as resp:
return resp.read()
except Exception as e:
log(f"ERROR fetching {url}: {e}")
return None
def compute_hash(content):
"""Compute SHA256 hash of content"""
return hashlib.sha256(content).hexdigest()
def load_snapshot(page_id):
"""Load previous page text snapshot"""
snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt"
try:
return snapshot_file.read_text(encoding='utf-8')
except:
return None
def save_snapshot(page_id, text):
"""Save page text snapshot"""
snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt"
snapshot_file.write_text(text, encoding='utf-8')
def generate_diff(old_text, new_text, context_lines=3):
"""Generate unified diff between old and new text"""
if not old_text:
return None
old_lines = old_text.splitlines(keepends=True)
new_lines = new_text.splitlines(keepends=True)
diff = list(difflib.unified_diff(
old_lines, new_lines,
fromfile='anterior',
tofile='actual',
n=context_lines
))
if not diff:
return None
# Limitează diff-ul la maxim 50 linii pentru output
if len(diff) > 50:
diff = diff[:50] + ['... (truncat)\n']
return ''.join(diff)
def parse_date_from_filename(filename):
"""Extrage data din numele fișierului (ex: D394_26092025.pdf -> 26.09.2025)"""
# Pattern: _DDMMYYYY. sau _DDMMYYYY_ sau _YYYYMMDD
match = re.search(r'_(\d{8})[\._]', filename)
if match:
d = match.group(1)
# Verifică dacă e DDMMYYYY sau YYYYMMDD
if int(d[:2]) <= 31 and int(d[2:4]) <= 12:
return f"{d[:2]}.{d[2:4]}.{d[4:]}"
elif int(d[4:6]) <= 12 and int(d[6:]) <= 31:
return f"{d[6:]}.{d[4:6]}.{d[:4]}"
# Pattern: _DDMMYY
match = re.search(r'_(\d{6})[\._]', filename)
if match:
d = match.group(1)
if int(d[:2]) <= 31 and int(d[2:4]) <= 12:
return f"{d[:2]}.{d[2:4]}.20{d[4:]}"
return None
def extract_versions(html):
"""Extrage soft A/J din HTML - primul generic + toate cele cu label (S1002, etc.)"""
versions = {}
# Găsește PRIMUL link soft A (PDF) - versiunea curentă
soft_a_match = re.search(
r'<a[^>]+href=["\']([^"\']*\.pdf)["\'][^>]*>\s*soft\s*A\s*</a>',
html, re.IGNORECASE
)
if soft_a_match:
url = soft_a_match.group(1)
versions['soft_a_url'] = url
date = parse_date_from_filename(url)
if date:
versions['soft_a_date'] = date
# Găsește soft J-uri CU LABEL (ex: "soft J - S1002") - toate
soft_j_labeled = re.findall(
r'<a[^>]+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J\s*-\s*([^<]+)',
html, re.IGNORECASE
)
if soft_j_labeled:
# Pagină cu soft-uri denumite (bilanț)
for url, label in soft_j_labeled:
label = label.strip()
key = f'soft_j_{label.replace(" ", "_")}'
versions[f'{key}_url'] = url
date = parse_date_from_filename(url)
if date:
versions[f'{key}_date'] = date
else:
# Pagină cu soft J simplu - ia doar primul
soft_j_match = re.search(
r'<a[^>]+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J',
html, re.IGNORECASE
)
if soft_j_match:
url = soft_j_match.group(1)
versions['soft_j_url'] = url
date = parse_date_from_filename(url)
if date:
versions['soft_j_date'] = date
# Găsește data publicării din text
publish_match = re.search(
r'publicat\s+[îi]n\s*(?:data\s+de\s*)?(\d{2}[./]\d{2}[./]\d{4})',
html, re.IGNORECASE
)
if publish_match:
versions['published'] = publish_match.group(1).replace('/', '.')
return versions
def compare_versions(old, new):
"""Compară versiunile și returnează diferențele"""
changes = []
# Colectează toate cheile unice
all_keys = set(old.keys()) | set(new.keys())
date_keys = sorted([k for k in all_keys if k.endswith('_date') or k == 'published'])
for key in date_keys:
old_val = old.get(key)
new_val = new.get(key)
# Formatează label-ul
label = key.replace('_date', '').replace('_', ' ').title()
if new_val and old_val != new_val:
if old_val:
changes.append(f"{label}: {old_val}{new_val}")
else:
changes.append(f"{label}: {new_val} (NOU)")
return changes
def format_current_versions(versions):
"""Formatează versiunile curente pentru output"""
result = {}
for key, val in versions.items():
if key.endswith('_date'):
label = key.replace('_date', '')
result[label] = val
return result
def check_page(page, saved_versions, saved_hashes):
"""Verifică o pagină și returnează modificările"""
page_id = page["id"]
name = page["name"]
url = page["url"]
content = fetch_page(url)
if content is None:
return None
# 1. Verifică hash-ul mai întâi (detectează ORICE schimbare)
new_hash = compute_hash(content)
old_hash = saved_hashes.get(page_id)
html = content.decode('utf-8', errors='ignore')
new_text = html_to_text(html)
new_versions = extract_versions(html)
old_versions = saved_versions.get(page_id, {})
# Încarcă snapshot-ul anterior
old_text = load_snapshot(page_id)
# Prima rulare - inițializare
if not old_hash:
log(f"INIT: {page_id}")
saved_hashes[page_id] = new_hash
saved_versions[page_id] = new_versions
save_snapshot(page_id, new_text)
return None
# Compară hash-uri
hash_changed = new_hash != old_hash
# Compară versiuni pentru detalii
version_changes = compare_versions(old_versions, new_versions)
# Generează diff dacă s-a schimbat
diff = None
if hash_changed and old_text:
diff = generate_diff(old_text, new_text)
# Actualizează starea
saved_hashes[page_id] = new_hash
saved_versions[page_id] = new_versions
save_snapshot(page_id, new_text)
if hash_changed:
if version_changes:
log(f"CHANGES in {page_id}: {version_changes}")
else:
log(f"HASH CHANGED in {page_id} (no version changes detected)")
version_changes = ["Pagina s-a modificat (vezi diff)"]
result = {
"id": page_id,
"name": name,
"url": url,
"changes": version_changes,
"current": format_current_versions(new_versions)
}
if diff:
result["diff"] = diff
return result
log(f"OK: {page_id}")
return None
def update_dashboard_status(has_changes, changes_count):
"""Actualizează status.json pentru dashboard"""
try:
status = load_json(DASHBOARD_STATUS, {})
status['anaf'] = {
'ok': not has_changes,
'status': 'MODIFICĂRI' if has_changes else 'OK',
'message': f'{changes_count} modificări detectate' if has_changes else 'Nicio modificare detectată',
'lastCheck': datetime.now().strftime('%d %b %Y, %H:%M'),
'changesCount': changes_count
}
save_json(DASHBOARD_STATUS, status)
except Exception as e:
log(f"ERROR updating dashboard status: {e}")
def main():
log("=== Starting ANAF monitor v2.1 ===")
config = load_json(CONFIG_FILE, {"pages": []})
saved_versions = load_json(VERSIONS_FILE, {})
saved_hashes = load_json(HASHES_FILE, {})
all_changes = []
for page in config["pages"]:
result = check_page(page, saved_versions, saved_hashes)
if result:
all_changes.append(result)
save_json(VERSIONS_FILE, saved_versions)
save_json(HASHES_FILE, saved_hashes)
# Update dashboard status
update_dashboard_status(len(all_changes) > 0, len(all_changes))
log("=== Monitor complete ===")
print(json.dumps({"changes": all_changes}, ensure_ascii=False, indent=2))
return len(all_changes)
if __name__ == "__main__":
exit(main())