- AGENTS.md, SOUL.md, USER.md, IDENTITY.md - ANAF monitor (declarații fiscale) - Kanban board + Notes UI - Email tools - Memory system
98 lines
2.8 KiB
Python
98 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Archive old Done tasks to monthly archive files.
|
|
Run periodically (heartbeat) to keep tasks.json small.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
TASKS_FILE = Path(__file__).parent / "tasks.json"
|
|
ARCHIVE_DIR = Path(__file__).parent / "archive"
|
|
DAYS_TO_KEEP = 7 # Keep Done tasks for 7 days before archiving
|
|
|
|
def archive_old_tasks():
|
|
if not TASKS_FILE.exists():
|
|
print("No tasks.json found")
|
|
return
|
|
|
|
with open(TASKS_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
# Find Done column
|
|
done_col = None
|
|
for col in data['columns']:
|
|
if col['id'] == 'done':
|
|
done_col = col
|
|
break
|
|
|
|
if not done_col:
|
|
print("No Done column found")
|
|
return
|
|
|
|
# Calculate cutoff date
|
|
cutoff = (datetime.now() - timedelta(days=DAYS_TO_KEEP)).strftime('%Y-%m-%d')
|
|
|
|
# Separate old and recent tasks
|
|
old_tasks = []
|
|
recent_tasks = []
|
|
|
|
for task in done_col['tasks']:
|
|
completed = task.get('completed', task.get('created', ''))
|
|
if completed and completed < cutoff:
|
|
old_tasks.append(task)
|
|
else:
|
|
recent_tasks.append(task)
|
|
|
|
if not old_tasks:
|
|
print(f"No tasks older than {DAYS_TO_KEEP} days to archive")
|
|
return
|
|
|
|
# Create archive directory
|
|
ARCHIVE_DIR.mkdir(exist_ok=True)
|
|
|
|
# Group old tasks by month
|
|
by_month = {}
|
|
for task in old_tasks:
|
|
completed = task.get('completed', task.get('created', ''))[:7] # YYYY-MM
|
|
if completed not in by_month:
|
|
by_month[completed] = []
|
|
by_month[completed].append(task)
|
|
|
|
# Write to monthly archive files
|
|
for month, tasks in by_month.items():
|
|
archive_file = ARCHIVE_DIR / f"tasks-{month}.json"
|
|
|
|
# Load existing archive
|
|
if archive_file.exists():
|
|
with open(archive_file, 'r') as f:
|
|
archive = json.load(f)
|
|
else:
|
|
archive = {"month": month, "tasks": []}
|
|
|
|
# Add new tasks (avoid duplicates by ID)
|
|
existing_ids = {t['id'] for t in archive['tasks']}
|
|
for task in tasks:
|
|
if task['id'] not in existing_ids:
|
|
archive['tasks'].append(task)
|
|
|
|
# Save archive
|
|
with open(archive_file, 'w') as f:
|
|
json.dump(archive, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Archived {len(tasks)} tasks to {archive_file.name}")
|
|
|
|
# Update tasks.json with only recent Done tasks
|
|
done_col['tasks'] = recent_tasks
|
|
data['lastUpdated'] = datetime.now().isoformat()
|
|
|
|
with open(TASKS_FILE, 'w') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Kept {len(recent_tasks)} recent Done tasks, archived {len(old_tasks)}")
|
|
|
|
if __name__ == "__main__":
|
|
archive_old_tasks()
|