#!/usr/bin/env python3 import os import sys import time import uuid import json import shutil import sqlite3 import urllib.parse import urllib.request DB_PATH = os.environ.get("DISKMON_DB", "/var/lib/diskmon/diskmon.sqlite3") MOUNT_PATH = os.environ.get("DISKMON_MOUNT", "/") MATRIX_HOMESERVER = os.environ["MATRIX_HOMESERVER"].rstrip("/") MATRIX_ROOM_ID = os.environ["MATRIX_ROOM_ID"] MATRIX_ACCESS_TOKEN = os.environ["MATRIX_ACCESS_TOKEN"] # thresholds TEN_MIN_SECONDS = 10 * 60 ONE_HOUR_SECONDS = 60 * 60 ONE_GIB = 1024 ** 3 TEN_GIB = 10 * ONE_GIB # cooldowns to avoid spam WARNING_COOLDOWN = 30 * 60 CRITICAL_COOLDOWN = 60 * 60 def ensure_db(conn: sqlite3.Connection) -> None: conn.execute(""" CREATE TABLE IF NOT EXISTS samples ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, mount TEXT NOT NULL, used_bytes INTEGER NOT NULL, avail_bytes INTEGER NOT NULL, total_bytes INTEGER NOT NULL ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS alerts ( key TEXT PRIMARY KEY, last_sent_ts INTEGER NOT NULL ) """) conn.commit() def get_disk_usage(path: str): usage = shutil.disk_usage(path) total = usage.total used = usage.used free = usage.free return total, used, free def bytes_human(n: int) -> str: units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] value = float(n) for unit in units: if value < 1024 or unit == units[-1]: if unit == "B": return f"{int(value)} {unit}" return f"{value:.1f} {unit}" value /= 1024 return f"{n} B" def percent_used(used: int, total: int) -> float: if total == 0: return 0.0 return (used / total) * 100.0 def insert_sample(conn: sqlite3.Connection, ts: int, mount: str, used: int, avail: int, total: int) -> None: conn.execute( "INSERT INTO samples (ts, mount, used_bytes, avail_bytes, total_bytes) VALUES (?, ?, ?, ?, ?)", (ts, mount, used, avail, total), ) conn.commit() def prune_old_samples(conn: sqlite3.Connection, now_ts: int) -> None: # keep 2 days of history, more than enough cutoff = now_ts - (2 * 24 * 60 * 60) conn.execute("DELETE FROM samples WHERE ts < ?", (cutoff,)) conn.commit() def get_oldest_sample_at_least(conn: sqlite3.Connection, mount: str, min_age_seconds: int, now_ts: int): cutoff_ts = now_ts - min_age_seconds row = conn.execute( """ SELECT ts, used_bytes, avail_bytes, total_bytes FROM samples WHERE mount = ? AND ts <= ? ORDER BY ts DESC LIMIT 1 """, (mount, cutoff_ts), ).fetchone() return row def should_send_alert(conn: sqlite3.Connection, key: str, now_ts: int, cooldown_seconds: int) -> bool: row = conn.execute("SELECT last_sent_ts FROM alerts WHERE key = ?", (key,)).fetchone() if row is None: return True return (now_ts - row[0]) >= cooldown_seconds def mark_alert_sent(conn: sqlite3.Connection, key: str, now_ts: int) -> None: conn.execute( """ INSERT INTO alerts (key, last_sent_ts) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET last_sent_ts = excluded.last_sent_ts """, (key, now_ts), ) conn.commit() def send_matrix_message(body: str) -> None: txn_id = str(uuid.uuid4()) room_id = urllib.parse.quote(MATRIX_ROOM_ID, safe="") event_type = "m.room.message" url = f"{MATRIX_HOMESERVER}/_matrix/client/v3/rooms/{room_id}/send/{event_type}/{txn_id}" payload = { "msgtype": "m.text", "body": body, } req = urllib.request.Request( url, data=json.dumps(payload).encode("utf-8"), method="PUT", headers={ "Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}", "Content-Type": "application/json", }, ) with urllib.request.urlopen(req, timeout=20) as resp: if resp.status < 200 or resp.status >= 300: raise RuntimeError(f"Matrix send failed with HTTP {resp.status}") def format_report(ts: int, mount: str, used: int, avail: int, total: int) -> str: pct = percent_used(used, total) local_time = time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(ts)) return ( f"[VPS Storage Report]\n" f"Mount: {mount}\n" f"Used: {bytes_human(used)}\n" f"Available: {bytes_human(avail)}\n" f"Total: {bytes_human(total)}\n" f"Usage: {pct:.1f}%\n" f"Timestamp: {local_time}" ) def format_change_alert(window_name: str, delta: int, prev_used: int, current_used: int, ts: int, mount: str) -> str: local_time = time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(ts)) return ( f"[Storage Alert]\n" f"Mount: {mount}\n" f"Used space increased by {bytes_human(delta)} in {window_name}\n" f"Previous used: {bytes_human(prev_used)}\n" f"Current used: {bytes_human(current_used)}\n" f"Timestamp: {local_time}" ) def do_report(conn: sqlite3.Connection) -> None: now_ts = int(time.time()) total, used, avail = get_disk_usage(MOUNT_PATH) insert_sample(conn, now_ts, MOUNT_PATH, used, avail, total) prune_old_samples(conn, now_ts) send_matrix_message(format_report(now_ts, MOUNT_PATH, used, avail, total)) def do_sample(conn: sqlite3.Connection) -> None: now_ts = int(time.time()) total, used, avail = get_disk_usage(MOUNT_PATH) insert_sample(conn, now_ts, MOUNT_PATH, used, avail, total) prune_old_samples(conn, now_ts) ten_min_sample = get_oldest_sample_at_least(conn, MOUNT_PATH, TEN_MIN_SECONDS, now_ts) if ten_min_sample is not None: prev_ts, prev_used, _prev_avail, _prev_total = ten_min_sample delta = used - prev_used if delta >= ONE_GIB and should_send_alert(conn, "warn_10m", now_ts, WARNING_COOLDOWN): send_matrix_message(format_change_alert("10 minutes", delta, prev_used, used, now_ts, MOUNT_PATH)) mark_alert_sent(conn, "warn_10m", now_ts) one_hour_sample = get_oldest_sample_at_least(conn, MOUNT_PATH, ONE_HOUR_SECONDS, now_ts) if one_hour_sample is not None: prev_ts, prev_used, _prev_avail, _prev_total = one_hour_sample delta = used - prev_used if delta >= TEN_GIB and should_send_alert(conn, "crit_60m", now_ts, CRITICAL_COOLDOWN): send_matrix_message(format_change_alert("60 minutes", delta, prev_used, used, now_ts, MOUNT_PATH)) mark_alert_sent(conn, "crit_60m", now_ts) def main(): if len(sys.argv) != 2 or sys.argv[1] not in {"report", "sample"}: print("Usage: disk_monitor.py [report|sample]", file=sys.stderr) sys.exit(2) os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) try: ensure_db(conn) if sys.argv[1] == "report": do_report(conn) else: do_sample(conn) finally: conn.close() if __name__ == "__main__": main()