diff --git a/docs/config.md b/docs/config.md index 7e4c001..f24c5b3 100644 --- a/docs/config.md +++ b/docs/config.md @@ -277,6 +277,29 @@ The proxy binary is automatically downloaded from [CLIProxyAPI](https://github.c | `sessions.max_sessions` | int | `500` | Max active (non-archived) sessions before cleanup | | `sessions.cron_session_mode` | string | `per_run` | `per_run` (unique session per cron run) or `reuse` (shared session per job) | +## Retention + +Opt-in `nerve.db` maintenance. Disabled by default. When enabled, a background +pass every `interval_hours` drops the verbose `blocks`/`thinking` JSON of old, +already-memorized messages (keeping the rendered `content`), prunes append-only +telemetry and file snapshots older than `retention_days`, and checkpoints the +WAL. This frees space inside the database but does not shrink the file on disk; +run `nerve db vacuum` once (with the daemon stopped) to reclaim it. + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `retention.enabled` | bool | `false` | Master switch for the background retention pass | +| `retention.retention_full_days` | int | `30` | Compact `blocks`/`thinking` of memorized messages older than this | +| `retention.retention_days` | int | `90` | Prune telemetry and file snapshots older than this | +| `retention.interval_hours` | int | `24` | How often the background pass runs | + +Manual commands (run regardless of `enabled`): + +- `nerve db prune [--dry-run]` runs one pass immediately. `--dry-run` reports + what would change without mutating. +- `nerve db vacuum` rewrites the file to reclaim freed pages. It takes a write + lock, so stop the daemon first. + ## Cron | Key | Type | Default | Description | diff --git a/nerve/cli.py b/nerve/cli.py index 766e544..2e1c1ff 100644 --- a/nerve/cli.py +++ b/nerve/cli.py @@ -1560,5 +1560,122 @@ def backfill_timestamps(ctx: click.Context, dry_run: bool) -> None: click.echo(f"\n{'Would update' if dry_run else 'Updated'} {updated} items, skipped {skipped}") +def _fmt_bytes(n: int) -> str: + """Render a byte count as MB with one decimal.""" + return f"{(n or 0) / (1024 * 1024):.1f} MB" + + +@main.group(name="db") +def db_group() -> None: + """Database maintenance (prune old data, vacuum to reclaim space).""" + + +@db_group.command("prune") +@click.option("--dry-run", is_flag=True, help="Report what would change without mutating") +@click.pass_context +def db_prune(ctx: click.Context, dry_run: bool) -> None: + """Compact old memorized messages and prune old telemetry + file snapshots. + + Uses the configured retention windows (retention.retention_full_days for + message compaction, retention.retention_days for telemetry/snapshots). + Frees space inside the DB; run ``nerve db vacuum`` afterwards to shrink the + file on disk. + """ + from nerve.db import Database + + config = ctx.obj["config"] + db_path = Path("~/.nerve/nerve.db").expanduser() + if not db_path.exists(): + click.echo(f"[ERR] Database not found: {db_path}") + ctx.exit(1) + return + + full_days = config.retention.retention_full_days + days = config.retention.retention_days + click.echo( + f"{'[dry-run] ' if dry_run else ''}Pruning {db_path} " + f"(compact messages > {full_days}d, telemetry/snapshots > {days}d)" + ) + + async def _run() -> None: + database = Database(db_path, workspace=config.workspace) + await database.connect() + try: + report = await database.run_retention( + retention_days=days, + retention_full_days=full_days, + dry_run=dry_run, + ) + finally: + await database.close() + + verb = "Would compact" if dry_run else "Compacted" + click.echo( + f" {verb} {report['messages_compacted']} messages " + f"(~{_fmt_bytes(report['message_bytes_reclaimed'])} of blocks/thinking)" + ) + verb = "Would delete" if dry_run else "Deleted" + click.echo(f" {verb} {report['telemetry_deleted']} telemetry rows:") + for table, n in report["by_table"].items(): + click.echo(f" {table}: {n}") + click.echo( + f" {verb} {report['snapshots_deleted']} file snapshots " + f"(~{_fmt_bytes(report['snapshot_bytes_reclaimed'])})" + ) + if not dry_run: + click.echo("\nFreed space inside the DB. Run `nerve db vacuum` to shrink the file.") + + asyncio.run(_run()) + + +@db_group.command("vacuum") +@click.pass_context +def db_vacuum(ctx: click.Context) -> None: + """Rewrite the DB file to reclaim freed pages (shrinks the file on disk). + + VACUUM takes a write lock and cannot run while the daemon holds the DB. + Stop the daemon first (`nerve stop`) for a clean run. + """ + from nerve.db import Database + + config = ctx.obj["config"] + db_path = Path("~/.nerve/nerve.db").expanduser() + if not db_path.exists(): + click.echo(f"[ERR] Database not found: {db_path}") + ctx.exit(1) + return + + running, _pid = _get_daemon_status() + if running: + click.echo( + "[WARN] Nerve daemon is running. VACUUM needs an exclusive lock and " + "will likely fail with 'database is locked'. Run `nerve stop` first." + ) + + size_before = db_path.stat().st_size + click.echo(f"Vacuuming {db_path} ({_fmt_bytes(size_before)})...") + + async def _run() -> None: + database = Database(db_path, workspace=config.workspace) + await database.connect() + try: + await database.vacuum() + finally: + await database.close() + + try: + asyncio.run(_run()) + except Exception as e: + click.echo(f"[ERR] VACUUM failed: {e}") + ctx.exit(1) + return + + size_after = db_path.stat().st_size + click.echo( + f"Done. {_fmt_bytes(size_before)} -> {_fmt_bytes(size_after)} " + f"(reclaimed {_fmt_bytes(size_before - size_after)})" + ) + + if __name__ == "__main__": main() diff --git a/nerve/config.py b/nerve/config.py index 9ea0199..d0c28b3 100644 --- a/nerve/config.py +++ b/nerve/config.py @@ -533,6 +533,38 @@ def from_dict(cls, d: dict) -> SessionsConfig: ) +@dataclass +class RetentionConfig: + """Opt-in nerve.db retention: message compaction + telemetry pruning. + + Disabled by default so an upstream merge mutates no existing user's data; + the operator opts in locally. When enabled, a background pass every + ``interval_hours`` drops the verbose ``blocks``/``thinking`` JSON of old, + already-memorized, non-starred, non-active messages (keeping ``content``), + prunes append-only telemetry + file snapshots older than + ``retention_days``, and checkpoints the WAL. The file is only shrunk by the + explicit ``nerve db vacuum`` command (VACUUM takes a write lock). + + ``retention_full_days`` is the message-compaction window (default 30); + ``retention_days`` is the telemetry/snapshot window (default 90). Both + ints are clamped ``>= 1``. + """ + + enabled: bool = False + retention_days: int = 90 + retention_full_days: int = 30 + interval_hours: int = 24 + + @classmethod + def from_dict(cls, d: dict) -> RetentionConfig: + return cls( + enabled=bool(d.get("enabled", False)), + retention_days=max(1, int(d.get("retention_days", 90))), + retention_full_days=max(1, int(d.get("retention_full_days", 30))), + interval_hours=max(1, int(d.get("interval_hours", 24))), + ) + + @dataclass class AuthConfig: password_hash: str = "" @@ -957,6 +989,7 @@ class NerveConfig: cron: CronConfig = field(default_factory=CronConfig) backup: BackupConfig = field(default_factory=BackupConfig) sessions: SessionsConfig = field(default_factory=SessionsConfig) + retention: RetentionConfig = field(default_factory=RetentionConfig) auth: AuthConfig = field(default_factory=AuthConfig) channels: ChannelsConfig = field(default_factory=ChannelsConfig) notifications: NotificationsConfig = field(default_factory=NotificationsConfig) @@ -1073,6 +1106,7 @@ def from_dict(cls, d: dict) -> NerveConfig: cron=CronConfig.from_dict(d.get("cron", {})), backup=BackupConfig.from_dict(d.get("backup", {})), sessions=SessionsConfig.from_dict(d.get("sessions", {})), + retention=RetentionConfig.from_dict(d.get("retention", {})), auth=AuthConfig.from_dict(d.get("auth", {})), channels=ChannelsConfig.from_dict(d.get("channels", {})), notifications=NotificationsConfig.from_dict(d.get("notifications", {})), diff --git a/nerve/db/base.py b/nerve/db/base.py index 1b080d6..3e6eda5 100644 --- a/nerve/db/base.py +++ b/nerve/db/base.py @@ -18,6 +18,7 @@ from nerve.db.audit import AuditStore from nerve.db.cron import CronStore from nerve.db.files import FileStore +from nerve.db.maintenance import MaintenanceStore from nerve.db.mcp import McpStore from nerve.db.messages import MessageStore from nerve.db.migrations.runner import discover_migrations, run_migrations @@ -53,6 +54,7 @@ class Database( UsageStore, FileStore, WakeupStore, + MaintenanceStore, ): """Async SQLite database wrapper. diff --git a/nerve/db/maintenance.py b/nerve/db/maintenance.py new file mode 100644 index 0000000..d68d6a7 --- /dev/null +++ b/nerve/db/maintenance.py @@ -0,0 +1,221 @@ +"""Database maintenance: message compaction, telemetry pruning, reclaim. + +Opt-in retention for ``nerve.db``. The dominant footprint is the verbose +machine-facing ``blocks``/``thinking`` JSON on old messages, which is safe to +drop once a message is in memU and no longer rendered live: + +* memU extraction reads ``content``, not ``blocks`` + (``nerve/memory/memu_bridge.py``), gated by the per-session + ``last_memorized_at`` watermark (``nerve/agent/engine.py``). +* SDK resume restores context from the ``.jsonl`` transcript, not DB blocks. +* The only remaining reader of ``blocks`` is UI rendering of an opened + session, which falls back to the kept ``content`` text when blocks is NULL. + +So compaction targets messages that are older than the full-retention window, +already past their session's memorize watermark, in a non-starred session, and +not the currently connected (``active``) session. + +Telemetry tables and file snapshots are append-only and pruned by age. + +Reclaim model: nulling columns and deleting rows frees pages to the SQLite +freelist (reused by later writes) but does not shrink the file. +``PRAGMA wal_checkpoint(TRUNCATE)`` truncates only the WAL. Only ``VACUUM`` +rewrites and shrinks the main DB file, so it is exposed as an explicit, +operator-run step (it takes a write lock and cannot run in a transaction). +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta, timezone + +logger = logging.getLogger(__name__) + + +# Append-only telemetry tables keyed by their timestamp column. memu_audit_log +# uses ``timestamp``; the others use ``created_at``. Names are static (never +# user input), so interpolating them into SQL is safe. +_TELEMETRY_TABLES: dict[str, str] = { + "session_events": "created_at", + "mcp_tool_usage": "created_at", + "session_usage": "created_at", + "memu_audit_log": "timestamp", +} + +# Compaction predicate, shared by the count (dry-run) query and the UPDATE. +# Single ``?`` param: the full-retention cutoff. ``datetime(...)`` normalizes +# the mixed stored timestamp formats (space-separated vs ISO-8601) to UTC. +_COMPACT_WHERE = """ + (m.blocks IS NOT NULL OR m.thinking IS NOT NULL) + AND datetime(m.created_at) < datetime(?) + AND s.last_memorized_at IS NOT NULL + AND datetime(m.created_at) < datetime(s.last_memorized_at) + AND s.starred = 0 + AND s.status != 'active' +""" + + +class MaintenanceStore: + """Mixin providing opt-in DB retention: compaction, pruning, reclaim.""" + + @staticmethod + def _cutoff(days: int) -> str: + """UTC cutoff ``days`` ago as ``YYYY-MM-DD HH:MM:SS`` (clamped >= 1).""" + days = max(1, int(days)) + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + return cutoff.strftime("%Y-%m-%d %H:%M:%S") + + async def compact_messages( + self, full_days: int, *, dry_run: bool = False, + ) -> dict: + """Drop ``blocks``/``thinking`` from old, memorized, inactive messages. + + Keeps ``content``, ``role``, ``created_at``, ``external_id``. Idempotent: + rows already NULL on both columns are skipped, so a second pass is a + no-op. ``content`` is never touched. + + Returns ``{"messages_compacted": int, "bytes_reclaimed": int}``. + ``bytes_reclaimed`` is an estimate (``LENGTH`` of the TEXT JSON). + """ + cutoff = self._cutoff(full_days) + async with self.db.execute( + f"""SELECT COUNT(*), + COALESCE(SUM(LENGTH(m.blocks)), 0) + + COALESCE(SUM(LENGTH(m.thinking)), 0) + FROM messages m + JOIN sessions s ON m.session_id = s.id + WHERE {_COMPACT_WHERE}""", + (cutoff,), + ) as cursor: + row = await cursor.fetchone() + count = row[0] if row else 0 + reclaimed = (row[1] or 0) if row else 0 + + if not dry_run and count: + async with self._atomic(): + await self.db.execute( + f"""UPDATE messages + SET blocks = NULL, thinking = NULL + WHERE id IN ( + SELECT m.id FROM messages m + JOIN sessions s ON m.session_id = s.id + WHERE {_COMPACT_WHERE} + )""", + (cutoff,), + ) + + return {"messages_compacted": count, "bytes_reclaimed": reclaimed} + + async def prune_telemetry( + self, days: int, *, dry_run: bool = False, + ) -> dict: + """Delete telemetry rows older than ``days`` from the append-only tables. + + Returns ``{"telemetry_deleted": int, "by_table": {table: int, ...}}``. + """ + cutoff = self._cutoff(days) + by_table: dict[str, int] = {} + total = 0 + for table, ts_col in _TELEMETRY_TABLES.items(): + async with self.db.execute( + f"SELECT COUNT(*) FROM {table} " + f"WHERE datetime({ts_col}) < datetime(?)", + (cutoff,), + ) as cursor: + row = await cursor.fetchone() + n = row[0] if row else 0 + by_table[table] = n + total += n + if not dry_run and n: + async with self._atomic(): + await self.db.execute( + f"DELETE FROM {table} " + f"WHERE datetime({ts_col}) < datetime(?)", + (cutoff,), + ) + return {"telemetry_deleted": total, "by_table": by_table} + + async def prune_file_snapshots( + self, days: int, *, dry_run: bool = False, + ) -> dict: + """Delete ``session_file_snapshots`` rows older than ``days``. + + Returns ``{"snapshots_deleted": int, "bytes_reclaimed": int}``. + """ + cutoff = self._cutoff(days) + async with self.db.execute( + "SELECT COUNT(*), COALESCE(SUM(LENGTH(original_content)), 0) " + "FROM session_file_snapshots WHERE datetime(created_at) < datetime(?)", + (cutoff,), + ) as cursor: + row = await cursor.fetchone() + count = row[0] if row else 0 + reclaimed = (row[1] or 0) if row else 0 + + if not dry_run and count: + async with self._atomic(): + await self.db.execute( + "DELETE FROM session_file_snapshots " + "WHERE datetime(created_at) < datetime(?)", + (cutoff,), + ) + return {"snapshots_deleted": count, "bytes_reclaimed": reclaimed} + + async def checkpoint(self) -> None: + """Truncate the WAL after a prune pass (best-effort). + + Frees the WAL file; does not shrink the main DB (see :meth:`vacuum`). + """ + await self.db.commit() + await self.db.execute("PRAGMA wal_checkpoint(TRUNCATE)") + + async def vacuum(self) -> None: + """Rewrite the DB file to reclaim freelist pages (shrinks the file). + + Takes a write lock and cannot run inside a transaction, so it is an + explicit operator step, never on the background loop. Run with the + daemon stopped to avoid lock contention. + """ + await self.db.commit() + await self.db.execute("VACUUM") + await self.db.commit() + + async def run_retention( + self, + *, + retention_days: int, + retention_full_days: int, + dry_run: bool = False, + ) -> dict: + """Run a full retention pass: compact, prune telemetry + snapshots, + then checkpoint the WAL. Does NOT VACUUM (file shrink is operator-run). + + Returns a merged report dict. + """ + compacted = await self.compact_messages( + retention_full_days, dry_run=dry_run, + ) + telemetry = await self.prune_telemetry(retention_days, dry_run=dry_run) + snapshots = await self.prune_file_snapshots( + retention_days, dry_run=dry_run, + ) + # Distinct keys: compaction and snapshot pruning both report + # ``bytes_reclaimed`` in isolation, so namespace them here rather than + # merging (which would clobber the larger compaction figure). + report = { + "dry_run": dry_run, + "messages_compacted": compacted["messages_compacted"], + "message_bytes_reclaimed": compacted["bytes_reclaimed"], + "telemetry_deleted": telemetry["telemetry_deleted"], + "by_table": telemetry["by_table"], + "snapshots_deleted": snapshots["snapshots_deleted"], + "snapshot_bytes_reclaimed": snapshots["bytes_reclaimed"], + } + # Only checkpoint when something actually changed on disk. + if not dry_run and ( + compacted["messages_compacted"] + or telemetry["telemetry_deleted"] + or snapshots["snapshots_deleted"] + ): + await self.checkpoint() + return report diff --git a/nerve/gateway/server.py b/nerve/gateway/server.py index 92905fa..f0ba694 100644 --- a/nerve/gateway/server.py +++ b/nerve/gateway/server.py @@ -262,6 +262,33 @@ async def _periodic_notify_expiry(): notify_expiry_task = asyncio.create_task(_periodic_notify_expiry()) + # Periodic DB retention (opt-in). Compacts old memorized messages' + # blocks/thinking JSON and prunes append-only telemetry + file snapshots, + # then checkpoints the WAL. No-ops unless retention.enabled is set. The + # file shrink (VACUUM) is an explicit operator step (`nerve db vacuum`), + # never on this loop. + async def _periodic_db_retention(): + if not config.retention.enabled: + return + interval = config.retention.interval_hours * 3600 + while True: + await asyncio.sleep(interval) + try: + report = await db.run_retention( + retention_days=config.retention.retention_days, + retention_full_days=config.retention.retention_full_days, + ) + if ( + report.get("messages_compacted") + or report.get("telemetry_deleted") + or report.get("snapshots_deleted") + ): + logger.info("DB retention: %s", report) + except Exception as e: + logger.error("DB retention failed: %s", e) + + db_retention_task = asyncio.create_task(_periodic_db_retention()) + # Periodic backup (opt-in). Hourly tick; runs a bundle when the newest # one in the target dir is older than interval_hours (or none exists). # The heavy work (consistent DB snapshots + tar) runs in a thread so it @@ -440,6 +467,7 @@ async def _periodic_backup(): if cron_task: await cron_task.stop() + db_retention_task.cancel() notify_expiry_task.cancel() backup_task.cancel() idle_sweep_task.cancel() diff --git a/tests/test_db_retention.py b/tests/test_db_retention.py new file mode 100644 index 0000000..4e08ad6 --- /dev/null +++ b/tests/test_db_retention.py @@ -0,0 +1,306 @@ +"""Tests for nerve.db.maintenance — opt-in retention (compaction + pruning). + +Safety contract under test (see notes/repo-conventions/nerve/db-retention.md): +compaction only drops blocks/thinking from messages that are old AND already +past their session's memorize watermark AND in a non-starred, non-active +session; it never touches content, never deletes rows, and is idempotent. +Telemetry/snapshot pruning deletes only the targeted append-only tables. +""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from nerve.db import Database + + +def _days_ago(n: float) -> str: + """A ``YYYY-MM-DD HH:MM:SS`` UTC timestamp ``n`` days in the past.""" + return (datetime.now(timezone.utc) - timedelta(days=n)).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + +async def _make_session( + db: Database, + sid: str, + *, + status: str = "idle", + starred: int = 0, + last_memorized_at: str | None = None, +) -> None: + await db.create_session(sid, status=status) + fields: dict = {"starred": starred} + if last_memorized_at is not None: + fields["last_memorized_at"] = last_memorized_at + await db.update_session_fields(sid, fields) + + +async def _add_msg( + db: Database, + sid: str, + *, + created_at: str, + content: str = "hello", +) -> int: + return await db.add_message( + sid, + role="assistant", + content=content, + thinking="internal reasoning", + blocks=[{"type": "text", "text": content}], + created_at=created_at, + ) + + +async def _blocks_thinking(db: Database, msg_id: int) -> tuple: + async with db.db.execute( + "SELECT blocks, thinking, content FROM messages WHERE id = ?", (msg_id,) + ) as cur: + row = await cur.fetchone() + return row["blocks"], row["thinking"], row["content"] + + +@pytest.mark.asyncio +class TestCompaction: + async def test_compacts_old_memorized_nonstarred_inactive(self, db: Database): + """The eligible case: old + memorized + not starred + not active.""" + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + mid = await _add_msg(db, "s1", created_at=_days_ago(100)) + + report = await db.compact_messages(full_days=30) + + assert report["messages_compacted"] == 1 + assert report["bytes_reclaimed"] > 0 + blocks, thinking, content = await _blocks_thinking(db, mid) + assert blocks is None + assert thinking is None + assert content == "hello" # content is always kept + + async def test_recent_message_untouched(self, db: Database): + """A message inside the full-retention window keeps its blocks.""" + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + mid = await _add_msg(db, "s1", created_at=_days_ago(5)) + + report = await db.compact_messages(full_days=30) + + assert report["messages_compacted"] == 0 + blocks, thinking, _ = await _blocks_thinking(db, mid) + assert blocks is not None + assert thinking is not None + + async def test_starred_session_untouched(self, db: Database): + await _make_session(db, "s1", starred=1, last_memorized_at=_days_ago(1)) + mid = await _add_msg(db, "s1", created_at=_days_ago(100)) + + report = await db.compact_messages(full_days=30) + + assert report["messages_compacted"] == 0 + blocks, _, _ = await _blocks_thinking(db, mid) + assert blocks is not None + + async def test_active_session_untouched(self, db: Database): + await _make_session( + db, "s1", status="active", last_memorized_at=_days_ago(1) + ) + mid = await _add_msg(db, "s1", created_at=_days_ago(100)) + + report = await db.compact_messages(full_days=30) + + assert report["messages_compacted"] == 0 + blocks, _, _ = await _blocks_thinking(db, mid) + assert blocks is not None + + async def test_unmemorized_message_untouched(self, db: Database): + """Watermark guard: a session never memorized (NULL watermark) is safe.""" + await _make_session(db, "s1", last_memorized_at=None) + mid = await _add_msg(db, "s1", created_at=_days_ago(100)) + + report = await db.compact_messages(full_days=30) + + assert report["messages_compacted"] == 0 + blocks, _, _ = await _blocks_thinking(db, mid) + assert blocks is not None + + async def test_message_newer_than_watermark_untouched(self, db: Database): + """A message after the watermark is not yet memorized; keep its blocks.""" + # Watermark older than the message: msg(50d) is newer than wm(120d). + await _make_session(db, "s1", last_memorized_at=_days_ago(120)) + mid = await _add_msg(db, "s1", created_at=_days_ago(50)) + + report = await db.compact_messages(full_days=30) + + assert report["messages_compacted"] == 0 + blocks, _, _ = await _blocks_thinking(db, mid) + assert blocks is not None + + async def test_idempotent(self, db: Database): + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + await _add_msg(db, "s1", created_at=_days_ago(100)) + + first = await db.compact_messages(full_days=30) + second = await db.compact_messages(full_days=30) + + assert first["messages_compacted"] == 1 + assert second["messages_compacted"] == 0 # nothing left to compact + + async def test_dry_run_reports_without_mutating(self, db: Database): + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + mid = await _add_msg(db, "s1", created_at=_days_ago(100)) + + report = await db.compact_messages(full_days=30, dry_run=True) + + assert report["messages_compacted"] == 1 + assert report["bytes_reclaimed"] > 0 + blocks, thinking, _ = await _blocks_thinking(db, mid) + assert blocks is not None # not mutated + assert thinking is not None + + async def test_row_count_unchanged(self, db: Database): + """Compaction nulls columns but never deletes message rows.""" + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + await _add_msg(db, "s1", created_at=_days_ago(100)) + await _add_msg(db, "s1", created_at=_days_ago(5)) + + async with db.db.execute("SELECT COUNT(*) FROM messages") as cur: + before = (await cur.fetchone())[0] + await db.compact_messages(full_days=30) + async with db.db.execute("SELECT COUNT(*) FROM messages") as cur: + after = (await cur.fetchone())[0] + + assert before == after == 2 + + +async def _seed_telemetry(db: Database, ts: str) -> None: + """Insert one row at timestamp ``ts`` into each telemetry table.""" + await db.create_session("tel", status="idle") + await db.db.execute( + "INSERT INTO session_events (session_id, event_type, created_at) " + "VALUES (?, ?, ?)", + ("tel", "test", ts), + ) + await db.db.execute( + "INSERT INTO mcp_tool_usage (server_name, tool_name, created_at) " + "VALUES (?, ?, ?)", + ("srv", "tool", ts), + ) + await db.db.execute( + "INSERT INTO session_usage (session_id, created_at) VALUES (?, ?)", + ("tel", ts), + ) + await db.db.execute( + "INSERT INTO memu_audit_log (action, target_type, timestamp) " + "VALUES (?, ?, ?)", + ("memorize", "session", ts), + ) + await db.db.commit() + + +async def _count(db: Database, table: str) -> int: + async with db.db.execute(f"SELECT COUNT(*) FROM {table}") as cur: + return (await cur.fetchone())[0] + + +@pytest.mark.asyncio +class TestTelemetryPrune: + async def test_deletes_old_keeps_new(self, db: Database): + await _seed_telemetry(db, _days_ago(200)) # old + await _seed_telemetry(db, _days_ago(1)) # new + + report = await db.prune_telemetry(days=90) + + assert report["telemetry_deleted"] == 4 # one old row per table + for table in ( + "session_events", + "mcp_tool_usage", + "session_usage", + "memu_audit_log", + ): + assert report["by_table"][table] == 1 + assert await _count(db, table) == 1 # the new row survives + + async def test_dry_run_mutates_nothing(self, db: Database): + await _seed_telemetry(db, _days_ago(200)) + + report = await db.prune_telemetry(days=90, dry_run=True) + + assert report["telemetry_deleted"] == 4 + assert await _count(db, "session_events") == 1 # still there + + async def test_preserves_core_tables(self, db: Database): + await _make_session(db, "core", last_memorized_at=_days_ago(1)) + await _add_msg(db, "core", created_at=_days_ago(200)) + await _seed_telemetry(db, _days_ago(200)) + + before = { + t: await _count(db, t) + for t in ("messages", "sessions", "tasks", "plans", "notifications") + } + await db.prune_telemetry(days=90) + after = { + t: await _count(db, t) + for t in ("messages", "sessions", "tasks", "plans", "notifications") + } + + assert before == after + + +@pytest.mark.asyncio +class TestFileSnapshotPrune: + async def test_deletes_old_keeps_new(self, db: Database): + await db.create_session("s1", status="idle") + await db.db.execute( + "INSERT INTO session_file_snapshots " + "(session_id, file_path, original_content, created_at) " + "VALUES (?, ?, ?, ?)", + ("s1", "/old.py", "x" * 100, _days_ago(200)), + ) + await db.db.execute( + "INSERT INTO session_file_snapshots " + "(session_id, file_path, original_content, created_at) " + "VALUES (?, ?, ?, ?)", + ("s1", "/new.py", "y" * 100, _days_ago(1)), + ) + await db.db.commit() + + report = await db.prune_file_snapshots(days=90) + + assert report["snapshots_deleted"] == 1 + assert report["bytes_reclaimed"] > 0 + assert await _count(db, "session_file_snapshots") == 1 + + +@pytest.mark.asyncio +class TestRunRetention: + async def test_combined_report(self, db: Database): + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + await _add_msg(db, "s1", created_at=_days_ago(100)) + await _seed_telemetry(db, _days_ago(200)) + + report = await db.run_retention( + retention_days=90, retention_full_days=30 + ) + + assert report["dry_run"] is False + assert report["messages_compacted"] == 1 + assert report["message_bytes_reclaimed"] > 0 + assert report["telemetry_deleted"] == 4 + assert "by_table" in report + assert report["snapshots_deleted"] == 0 + + async def test_dry_run_mutates_nothing(self, db: Database): + await _make_session(db, "s1", last_memorized_at=_days_ago(1)) + mid = await _add_msg(db, "s1", created_at=_days_ago(100)) + await _seed_telemetry(db, _days_ago(200)) + + report = await db.run_retention( + retention_days=90, retention_full_days=30, dry_run=True + ) + + assert report["dry_run"] is True + assert report["messages_compacted"] == 1 + assert report["telemetry_deleted"] == 4 + # nothing actually changed + blocks, _, _ = await _blocks_thinking(db, mid) + assert blocks is not None + assert await _count(db, "session_events") == 1