357 lines
11 KiB
Python
357 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ANAF Monitor v2.2 - Hash detection + version extraction + text diff
|
|
- Hash-based change detection (catches ANY change)
|
|
- Extracts ALL soft A/J versions from page
|
|
- Saves page text and shows diff on changes
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import hashlib
|
|
import urllib.request
|
|
import ssl
|
|
import difflib
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from html.parser import HTMLParser
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
CONFIG_FILE = SCRIPT_DIR / "config.json"
|
|
VERSIONS_FILE = SCRIPT_DIR / "versions.json"
|
|
HASHES_FILE = SCRIPT_DIR / "hashes.json"
|
|
SNAPSHOTS_DIR = SCRIPT_DIR / "snapshots"
|
|
LOG_FILE = SCRIPT_DIR / "monitor.log"
|
|
DASHBOARD_STATUS = SCRIPT_DIR.parent.parent / "dashboard" / "status.json"
|
|
|
|
# Ensure snapshots directory exists
|
|
SNAPSHOTS_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
class TextExtractor(HTMLParser):
|
|
"""Extract visible text from HTML"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.text = []
|
|
self.skip_tags = {'script', 'style', 'head', 'meta', 'link'}
|
|
self.current_tag = None
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
self.current_tag = tag.lower()
|
|
|
|
def handle_endtag(self, tag):
|
|
self.current_tag = None
|
|
|
|
def handle_data(self, data):
|
|
if self.current_tag not in self.skip_tags:
|
|
text = data.strip()
|
|
if text:
|
|
self.text.append(text)
|
|
|
|
def get_text(self):
|
|
return '\n'.join(self.text)
|
|
|
|
|
|
def html_to_text(html):
|
|
"""Convert HTML to plain text"""
|
|
parser = TextExtractor()
|
|
try:
|
|
parser.feed(html)
|
|
return parser.get_text()
|
|
except:
|
|
# Fallback: just strip tags
|
|
return re.sub(r'<[^>]+>', ' ', html)
|
|
|
|
SSL_CTX = ssl.create_default_context()
|
|
SSL_CTX.check_hostname = False
|
|
SSL_CTX.verify_mode = ssl.CERT_NONE
|
|
|
|
def log(msg):
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
with open(LOG_FILE, "a") as f:
|
|
f.write(f"[{timestamp}] {msg}\n")
|
|
|
|
def load_json(path, default=None):
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except:
|
|
return default if default is not None else {}
|
|
|
|
def save_json(path, data):
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
def fetch_page(url, timeout=30):
|
|
try:
|
|
req = urllib.request.Request(url, headers={
|
|
'User-Agent': 'Mozilla/5.0 (compatible; ANAF-Monitor/2.1)'
|
|
})
|
|
with urllib.request.urlopen(req, timeout=timeout, context=SSL_CTX) as resp:
|
|
return resp.read()
|
|
except Exception as e:
|
|
log(f"ERROR fetching {url}: {e}")
|
|
return None
|
|
|
|
def compute_hash(content):
|
|
"""Compute SHA256 hash of content"""
|
|
return hashlib.sha256(content).hexdigest()
|
|
|
|
|
|
def load_snapshot(page_id):
|
|
"""Load previous page text snapshot"""
|
|
snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt"
|
|
try:
|
|
return snapshot_file.read_text(encoding='utf-8')
|
|
except:
|
|
return None
|
|
|
|
|
|
def save_snapshot(page_id, text):
|
|
"""Save page text snapshot"""
|
|
snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt"
|
|
snapshot_file.write_text(text, encoding='utf-8')
|
|
|
|
|
|
def generate_diff(old_text, new_text, context_lines=3):
|
|
"""Generate unified diff between old and new text"""
|
|
if not old_text:
|
|
return None
|
|
|
|
old_lines = old_text.splitlines(keepends=True)
|
|
new_lines = new_text.splitlines(keepends=True)
|
|
|
|
diff = list(difflib.unified_diff(
|
|
old_lines, new_lines,
|
|
fromfile='anterior',
|
|
tofile='actual',
|
|
n=context_lines
|
|
))
|
|
|
|
if not diff:
|
|
return None
|
|
|
|
# Limitează diff-ul la maxim 50 linii pentru output
|
|
if len(diff) > 50:
|
|
diff = diff[:50] + ['... (truncat)\n']
|
|
|
|
return ''.join(diff)
|
|
|
|
def parse_date_from_filename(filename):
|
|
"""Extrage data din numele fișierului (ex: D394_26092025.pdf -> 26.09.2025)"""
|
|
# Pattern: _DDMMYYYY. sau _DDMMYYYY_ sau _YYYYMMDD
|
|
match = re.search(r'_(\d{8})[\._]', filename)
|
|
if match:
|
|
d = match.group(1)
|
|
# Verifică dacă e DDMMYYYY sau YYYYMMDD
|
|
if int(d[:2]) <= 31 and int(d[2:4]) <= 12:
|
|
return f"{d[:2]}.{d[2:4]}.{d[4:]}"
|
|
elif int(d[4:6]) <= 12 and int(d[6:]) <= 31:
|
|
return f"{d[6:]}.{d[4:6]}.{d[:4]}"
|
|
|
|
# Pattern: _DDMMYY
|
|
match = re.search(r'_(\d{6})[\._]', filename)
|
|
if match:
|
|
d = match.group(1)
|
|
if int(d[:2]) <= 31 and int(d[2:4]) <= 12:
|
|
return f"{d[:2]}.{d[2:4]}.20{d[4:]}"
|
|
|
|
return None
|
|
|
|
def extract_versions(html):
|
|
"""Extrage soft A/J din HTML - primul generic + toate cele cu label (S1002, etc.)"""
|
|
versions = {}
|
|
|
|
# Găsește PRIMUL link soft A (PDF) - versiunea curentă
|
|
soft_a_match = re.search(
|
|
r'<a[^>]+href=["\']([^"\']*\.pdf)["\'][^>]*>\s*soft\s*A\s*</a>',
|
|
html, re.IGNORECASE
|
|
)
|
|
if soft_a_match:
|
|
url = soft_a_match.group(1)
|
|
versions['soft_a_url'] = url
|
|
date = parse_date_from_filename(url)
|
|
if date:
|
|
versions['soft_a_date'] = date
|
|
|
|
# Găsește soft J-uri CU LABEL (ex: "soft J - S1002") - toate
|
|
soft_j_labeled = re.findall(
|
|
r'<a[^>]+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J\s*-\s*([^<]+)',
|
|
html, re.IGNORECASE
|
|
)
|
|
|
|
if soft_j_labeled:
|
|
# Pagină cu soft-uri denumite (bilanț)
|
|
for url, label in soft_j_labeled:
|
|
label = label.strip()
|
|
key = f'soft_j_{label.replace(" ", "_")}'
|
|
versions[f'{key}_url'] = url
|
|
date = parse_date_from_filename(url)
|
|
if date:
|
|
versions[f'{key}_date'] = date
|
|
else:
|
|
# Pagină cu soft J simplu - ia doar primul
|
|
soft_j_match = re.search(
|
|
r'<a[^>]+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J',
|
|
html, re.IGNORECASE
|
|
)
|
|
if soft_j_match:
|
|
url = soft_j_match.group(1)
|
|
versions['soft_j_url'] = url
|
|
date = parse_date_from_filename(url)
|
|
if date:
|
|
versions['soft_j_date'] = date
|
|
|
|
# Găsește data publicării din text
|
|
publish_match = re.search(
|
|
r'publicat\s+[îi]n\s*(?:data\s+de\s*)?(\d{2}[./]\d{2}[./]\d{4})',
|
|
html, re.IGNORECASE
|
|
)
|
|
if publish_match:
|
|
versions['published'] = publish_match.group(1).replace('/', '.')
|
|
|
|
return versions
|
|
|
|
def compare_versions(old, new):
|
|
"""Compară versiunile și returnează diferențele"""
|
|
changes = []
|
|
|
|
# Colectează toate cheile unice
|
|
all_keys = set(old.keys()) | set(new.keys())
|
|
date_keys = sorted([k for k in all_keys if k.endswith('_date') or k == 'published'])
|
|
|
|
for key in date_keys:
|
|
old_val = old.get(key)
|
|
new_val = new.get(key)
|
|
|
|
# Formatează label-ul
|
|
label = key.replace('_date', '').replace('_', ' ').title()
|
|
|
|
if new_val and old_val != new_val:
|
|
if old_val:
|
|
changes.append(f"{label}: {old_val} → {new_val}")
|
|
else:
|
|
changes.append(f"{label}: {new_val} (NOU)")
|
|
|
|
return changes
|
|
|
|
def format_current_versions(versions):
|
|
"""Formatează versiunile curente pentru output"""
|
|
result = {}
|
|
for key, val in versions.items():
|
|
if key.endswith('_date'):
|
|
label = key.replace('_date', '')
|
|
result[label] = val
|
|
return result
|
|
|
|
def check_page(page, saved_versions, saved_hashes):
|
|
"""Verifică o pagină și returnează modificările"""
|
|
page_id = page["id"]
|
|
name = page["name"]
|
|
url = page["url"]
|
|
|
|
content = fetch_page(url)
|
|
if content is None:
|
|
return None
|
|
|
|
# 1. Verifică hash-ul mai întâi (detectează ORICE schimbare)
|
|
new_hash = compute_hash(content)
|
|
old_hash = saved_hashes.get(page_id)
|
|
|
|
html = content.decode('utf-8', errors='ignore')
|
|
new_text = html_to_text(html)
|
|
new_versions = extract_versions(html)
|
|
old_versions = saved_versions.get(page_id, {})
|
|
|
|
# Încarcă snapshot-ul anterior
|
|
old_text = load_snapshot(page_id)
|
|
|
|
# Prima rulare - inițializare
|
|
if not old_hash:
|
|
log(f"INIT: {page_id}")
|
|
saved_hashes[page_id] = new_hash
|
|
saved_versions[page_id] = new_versions
|
|
save_snapshot(page_id, new_text)
|
|
return None
|
|
|
|
# Compară hash-uri
|
|
hash_changed = new_hash != old_hash
|
|
|
|
# Compară versiuni pentru detalii
|
|
version_changes = compare_versions(old_versions, new_versions)
|
|
|
|
# Generează diff dacă s-a schimbat
|
|
diff = None
|
|
if hash_changed and old_text:
|
|
diff = generate_diff(old_text, new_text)
|
|
|
|
# Actualizează starea
|
|
saved_hashes[page_id] = new_hash
|
|
saved_versions[page_id] = new_versions
|
|
save_snapshot(page_id, new_text)
|
|
|
|
if hash_changed:
|
|
if version_changes:
|
|
log(f"CHANGES in {page_id}: {version_changes}")
|
|
else:
|
|
log(f"HASH CHANGED in {page_id} (no version changes detected)")
|
|
version_changes = ["Pagina s-a modificat (vezi diff)"]
|
|
|
|
result = {
|
|
"id": page_id,
|
|
"name": name,
|
|
"url": url,
|
|
"changes": version_changes,
|
|
"current": format_current_versions(new_versions)
|
|
}
|
|
|
|
if diff:
|
|
result["diff"] = diff
|
|
|
|
return result
|
|
|
|
log(f"OK: {page_id}")
|
|
return None
|
|
|
|
def update_dashboard_status(has_changes, changes_count):
|
|
"""Actualizează status.json pentru dashboard"""
|
|
try:
|
|
status = load_json(DASHBOARD_STATUS, {})
|
|
status['anaf'] = {
|
|
'ok': not has_changes,
|
|
'status': 'MODIFICĂRI' if has_changes else 'OK',
|
|
'message': f'{changes_count} modificări detectate' if has_changes else 'Nicio modificare detectată',
|
|
'lastCheck': datetime.now().strftime('%d %b %Y, %H:%M'),
|
|
'changesCount': changes_count
|
|
}
|
|
save_json(DASHBOARD_STATUS, status)
|
|
except Exception as e:
|
|
log(f"ERROR updating dashboard status: {e}")
|
|
|
|
def main():
|
|
log("=== Starting ANAF monitor v2.1 ===")
|
|
|
|
config = load_json(CONFIG_FILE, {"pages": []})
|
|
saved_versions = load_json(VERSIONS_FILE, {})
|
|
saved_hashes = load_json(HASHES_FILE, {})
|
|
|
|
all_changes = []
|
|
for page in config["pages"]:
|
|
result = check_page(page, saved_versions, saved_hashes)
|
|
if result:
|
|
all_changes.append(result)
|
|
|
|
save_json(VERSIONS_FILE, saved_versions)
|
|
save_json(HASHES_FILE, saved_hashes)
|
|
|
|
# Update dashboard status
|
|
update_dashboard_status(len(all_changes) > 0, len(all_changes))
|
|
|
|
log("=== Monitor complete ===")
|
|
|
|
print(json.dumps({"changes": all_changes}, ensure_ascii=False, indent=2))
|
|
return len(all_changes)
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|