stage-1: project bootstrap
Structure, config loader, personality/tools/memory from clawd, venv, 22 tests passing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
372
tools/anaf-monitor/monitor_v2.py
Normal file
372
tools/anaf-monitor/monitor_v2.py
Normal file
@@ -0,0 +1,372 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ANAF Monitor v2.2 - Hash detection + version extraction + text diff
|
||||
- Hash-based change detection (catches ANY change)
|
||||
- Extracts ALL soft A/J versions from page
|
||||
- Saves page text and shows diff on changes
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import hashlib
|
||||
import urllib.request
|
||||
import ssl
|
||||
import difflib
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from html.parser import HTMLParser
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent
|
||||
CONFIG_FILE = SCRIPT_DIR / "config.json"
|
||||
VERSIONS_FILE = SCRIPT_DIR / "versions.json"
|
||||
HASHES_FILE = SCRIPT_DIR / "hashes.json"
|
||||
SNAPSHOTS_DIR = SCRIPT_DIR / "snapshots"
|
||||
LOG_FILE = SCRIPT_DIR / "monitor.log"
|
||||
DASHBOARD_STATUS = SCRIPT_DIR.parent.parent / "dashboard" / "status.json"
|
||||
|
||||
# Ensure snapshots directory exists
|
||||
SNAPSHOTS_DIR.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
class TextExtractor(HTMLParser):
|
||||
"""Extract visible text from HTML"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.text = []
|
||||
self.skip_tags = {'script', 'style', 'head', 'meta', 'link'}
|
||||
self.current_tag = None
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
self.current_tag = tag.lower()
|
||||
|
||||
def handle_endtag(self, tag):
|
||||
self.current_tag = None
|
||||
|
||||
def handle_data(self, data):
|
||||
if self.current_tag not in self.skip_tags:
|
||||
text = data.strip()
|
||||
if text:
|
||||
self.text.append(text)
|
||||
|
||||
def get_text(self):
|
||||
return '\n'.join(self.text)
|
||||
|
||||
|
||||
def html_to_text(html):
|
||||
"""Convert HTML to plain text"""
|
||||
parser = TextExtractor()
|
||||
try:
|
||||
parser.feed(html)
|
||||
return parser.get_text()
|
||||
except:
|
||||
# Fallback: just strip tags
|
||||
return re.sub(r'<[^>]+>', ' ', html)
|
||||
|
||||
SSL_CTX = ssl.create_default_context()
|
||||
SSL_CTX.check_hostname = False
|
||||
SSL_CTX.verify_mode = ssl.CERT_NONE
|
||||
|
||||
def log(msg):
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
with open(LOG_FILE, "a") as f:
|
||||
f.write(f"[{timestamp}] {msg}\n")
|
||||
|
||||
def load_json(path, default=None):
|
||||
try:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return default if default is not None else {}
|
||||
|
||||
def save_json(path, data):
|
||||
with open(path, "w") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
def fetch_page(url, timeout=30):
|
||||
try:
|
||||
req = urllib.request.Request(url, headers={
|
||||
'User-Agent': 'Mozilla/5.0 (compatible; ANAF-Monitor/2.1)'
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=timeout, context=SSL_CTX) as resp:
|
||||
return resp.read()
|
||||
except Exception as e:
|
||||
log(f"ERROR fetching {url}: {e}")
|
||||
return None
|
||||
|
||||
def compute_hash(content):
|
||||
"""Compute SHA256 hash of content"""
|
||||
return hashlib.sha256(content).hexdigest()
|
||||
|
||||
|
||||
def load_snapshot(page_id):
|
||||
"""Load previous page text snapshot"""
|
||||
snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt"
|
||||
try:
|
||||
return snapshot_file.read_text(encoding='utf-8')
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def save_snapshot(page_id, text):
|
||||
"""Save page text snapshot"""
|
||||
snapshot_file = SNAPSHOTS_DIR / f"{page_id}.txt"
|
||||
snapshot_file.write_text(text, encoding='utf-8')
|
||||
|
||||
|
||||
def generate_diff(old_text, new_text, context_lines=3):
|
||||
"""Generate unified diff between old and new text"""
|
||||
if not old_text:
|
||||
return None
|
||||
|
||||
old_lines = old_text.splitlines(keepends=True)
|
||||
new_lines = new_text.splitlines(keepends=True)
|
||||
|
||||
diff = list(difflib.unified_diff(
|
||||
old_lines, new_lines,
|
||||
fromfile='anterior',
|
||||
tofile='actual',
|
||||
n=context_lines
|
||||
))
|
||||
|
||||
if not diff:
|
||||
return None
|
||||
|
||||
# Limitează diff-ul la maxim 50 linii pentru output
|
||||
if len(diff) > 50:
|
||||
diff = diff[:50] + ['... (truncat)\n']
|
||||
|
||||
return ''.join(diff)
|
||||
|
||||
def parse_date_from_filename(filename):
|
||||
"""Extrage data din numele fișierului (ex: D394_26092025.pdf -> 26.09.2025)"""
|
||||
# Pattern: _DDMMYYYY. sau _DDMMYYYY_ sau _YYYYMMDD
|
||||
match = re.search(r'_(\d{8})[\._]', filename)
|
||||
if match:
|
||||
d = match.group(1)
|
||||
# Verifică dacă e DDMMYYYY sau YYYYMMDD
|
||||
if int(d[:2]) <= 31 and int(d[2:4]) <= 12:
|
||||
return f"{d[:2]}.{d[2:4]}.{d[4:]}"
|
||||
elif int(d[4:6]) <= 12 and int(d[6:]) <= 31:
|
||||
return f"{d[6:]}.{d[4:6]}.{d[:4]}"
|
||||
|
||||
# Pattern: _DDMMYY
|
||||
match = re.search(r'_(\d{6})[\._]', filename)
|
||||
if match:
|
||||
d = match.group(1)
|
||||
if int(d[:2]) <= 31 and int(d[2:4]) <= 12:
|
||||
return f"{d[:2]}.{d[2:4]}.20{d[4:]}"
|
||||
|
||||
return None
|
||||
|
||||
def extract_versions(html):
|
||||
"""Extrage soft A/J din HTML - primul generic + toate cele cu label (S1002, etc.)"""
|
||||
versions = {}
|
||||
|
||||
# Găsește PRIMUL link soft A (PDF) - versiunea curentă
|
||||
soft_a_match = re.search(
|
||||
r'<a[^>]+href=["\']([^"\']*\.pdf)["\'][^>]*>\s*soft\s*A\s*</a>',
|
||||
html, re.IGNORECASE
|
||||
)
|
||||
if soft_a_match:
|
||||
url = soft_a_match.group(1)
|
||||
versions['soft_a_url'] = url
|
||||
date = parse_date_from_filename(url)
|
||||
if date:
|
||||
versions['soft_a_date'] = date
|
||||
|
||||
# Găsește soft J-uri CU LABEL (ex: "soft J - S1002") - toate
|
||||
soft_j_labeled = re.findall(
|
||||
r'<a[^>]+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J\s*-\s*([^<]+)',
|
||||
html, re.IGNORECASE
|
||||
)
|
||||
|
||||
if soft_j_labeled:
|
||||
# Pagină cu soft-uri denumite (bilanț)
|
||||
for url, label in soft_j_labeled:
|
||||
label = label.strip()
|
||||
key = f'soft_j_{label.replace(" ", "_")}'
|
||||
versions[f'{key}_url'] = url
|
||||
date = parse_date_from_filename(url)
|
||||
if date:
|
||||
versions[f'{key}_date'] = date
|
||||
else:
|
||||
# Pagină cu soft J simplu - ia doar primul
|
||||
soft_j_match = re.search(
|
||||
r'<a[^>]+href=["\']([^"\']*\.zip)["\'][^>]*>\s*soft\s*J',
|
||||
html, re.IGNORECASE
|
||||
)
|
||||
if soft_j_match:
|
||||
url = soft_j_match.group(1)
|
||||
versions['soft_j_url'] = url
|
||||
date = parse_date_from_filename(url)
|
||||
if date:
|
||||
versions['soft_j_date'] = date
|
||||
|
||||
# Găsește data publicării din text
|
||||
publish_match = re.search(
|
||||
r'publicat\s+[îi]n\s*(?:data\s+de\s*)?(\d{2}[./]\d{2}[./]\d{4})',
|
||||
html, re.IGNORECASE
|
||||
)
|
||||
if publish_match:
|
||||
versions['published'] = publish_match.group(1).replace('/', '.')
|
||||
|
||||
return versions
|
||||
|
||||
def compare_versions(old, new):
|
||||
"""Compară versiunile și returnează diferențele"""
|
||||
changes = []
|
||||
|
||||
# Colectează toate cheile unice
|
||||
all_keys = set(old.keys()) | set(new.keys())
|
||||
date_keys = sorted([k for k in all_keys if k.endswith('_date') or k == 'published'])
|
||||
|
||||
for key in date_keys:
|
||||
old_val = old.get(key)
|
||||
new_val = new.get(key)
|
||||
|
||||
# Formatează label-ul
|
||||
label = key.replace('_date', '').replace('_', ' ').title()
|
||||
|
||||
if new_val and old_val != new_val:
|
||||
if old_val:
|
||||
changes.append(f"{label}: {old_val} → {new_val}")
|
||||
else:
|
||||
changes.append(f"{label}: {new_val} (NOU)")
|
||||
|
||||
return changes
|
||||
|
||||
def format_current_versions(versions):
|
||||
"""Formatează versiunile curente pentru output"""
|
||||
result = {}
|
||||
for key, val in versions.items():
|
||||
if key.endswith('_date'):
|
||||
label = key.replace('_date', '')
|
||||
result[label] = val
|
||||
return result
|
||||
|
||||
def check_page(page, saved_versions, saved_hashes):
|
||||
"""Verifică o pagină și returnează modificările"""
|
||||
page_id = page["id"]
|
||||
name = page["name"]
|
||||
url = page["url"]
|
||||
|
||||
content = fetch_page(url)
|
||||
if content is None:
|
||||
return None
|
||||
|
||||
# 1. Verifică hash-ul mai întâi (detectează ORICE schimbare)
|
||||
new_hash = compute_hash(content)
|
||||
old_hash = saved_hashes.get(page_id)
|
||||
|
||||
html = content.decode('utf-8', errors='ignore')
|
||||
new_text = html_to_text(html)
|
||||
new_versions = extract_versions(html)
|
||||
old_versions = saved_versions.get(page_id, {})
|
||||
|
||||
# Încarcă snapshot-ul anterior
|
||||
old_text = load_snapshot(page_id)
|
||||
|
||||
# Prima rulare - inițializare
|
||||
if not old_hash:
|
||||
log(f"INIT: {page_id}")
|
||||
saved_hashes[page_id] = new_hash
|
||||
saved_versions[page_id] = new_versions
|
||||
save_snapshot(page_id, new_text)
|
||||
return None
|
||||
|
||||
# Compară hash-uri
|
||||
hash_changed = new_hash != old_hash
|
||||
|
||||
# Compară versiuni pentru detalii
|
||||
version_changes = compare_versions(old_versions, new_versions)
|
||||
|
||||
# Generează diff dacă s-a schimbat
|
||||
diff = None
|
||||
if hash_changed and old_text:
|
||||
diff = generate_diff(old_text, new_text)
|
||||
|
||||
# Actualizează starea
|
||||
saved_hashes[page_id] = new_hash
|
||||
saved_versions[page_id] = new_versions
|
||||
save_snapshot(page_id, new_text)
|
||||
|
||||
if hash_changed:
|
||||
if version_changes:
|
||||
log(f"CHANGES in {page_id}: {version_changes}")
|
||||
else:
|
||||
log(f"HASH CHANGED in {page_id} (no version changes detected)")
|
||||
version_changes = ["Pagina s-a modificat"]
|
||||
|
||||
result = {
|
||||
"id": page_id,
|
||||
"name": name,
|
||||
"url": url,
|
||||
"changes": version_changes,
|
||||
"current": format_current_versions(new_versions)
|
||||
}
|
||||
|
||||
if diff:
|
||||
result["diff"] = diff
|
||||
|
||||
return result
|
||||
|
||||
log(f"OK: {page_id}")
|
||||
return None
|
||||
|
||||
def update_dashboard_status(has_changes, changes_count, changes_list=None):
|
||||
"""Actualizează status.json pentru dashboard"""
|
||||
try:
|
||||
status = load_json(DASHBOARD_STATUS, {})
|
||||
anaf_status = {
|
||||
'ok': not has_changes,
|
||||
'status': 'MODIFICĂRI' if has_changes else 'OK',
|
||||
'message': f'{changes_count} modificări detectate' if has_changes else 'Nicio modificare detectată',
|
||||
'lastCheck': datetime.now().strftime('%d %b %Y, %H:%M'),
|
||||
'changesCount': changes_count
|
||||
}
|
||||
|
||||
# Adaugă detaliile modificărilor pentru dashboard
|
||||
if has_changes and changes_list:
|
||||
anaf_status['changes'] = []
|
||||
for change in changes_list:
|
||||
change_detail = {
|
||||
'name': change.get('name', ''),
|
||||
'url': change.get('url', ''),
|
||||
'summary': []
|
||||
}
|
||||
# Ia primele 3 modificări ca rezumat
|
||||
if change.get('changes'):
|
||||
change_detail['summary'] = change['changes'][:3]
|
||||
anaf_status['changes'].append(change_detail)
|
||||
|
||||
status['anaf'] = anaf_status
|
||||
save_json(DASHBOARD_STATUS, status)
|
||||
except Exception as e:
|
||||
log(f"ERROR updating dashboard status: {e}")
|
||||
|
||||
def main():
|
||||
log("=== Starting ANAF monitor v2.1 ===")
|
||||
|
||||
config = load_json(CONFIG_FILE, {"pages": []})
|
||||
saved_versions = load_json(VERSIONS_FILE, {})
|
||||
saved_hashes = load_json(HASHES_FILE, {})
|
||||
|
||||
all_changes = []
|
||||
for page in config["pages"]:
|
||||
result = check_page(page, saved_versions, saved_hashes)
|
||||
if result:
|
||||
all_changes.append(result)
|
||||
|
||||
save_json(VERSIONS_FILE, saved_versions)
|
||||
save_json(HASHES_FILE, saved_hashes)
|
||||
|
||||
# Update dashboard status
|
||||
update_dashboard_status(len(all_changes) > 0, len(all_changes), all_changes)
|
||||
|
||||
log("=== Monitor complete ===")
|
||||
|
||||
print(json.dumps({"changes": all_changes}, ensure_ascii=False, indent=2))
|
||||
return len(all_changes)
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
Reference in New Issue
Block a user