Files
CloudOps/platform/modules/cloud_backup.py

337 lines
12 KiB
Python

"""
modules/cloud_backup.py
Cloudflare R2 cloud backup integration via boto3 (S3-compatible API)
"""
import os
import time
import threading
from datetime import datetime, timezone
try:
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
BOTO3_AVAILABLE = True
except ImportError:
BOTO3_AVAILABLE = False
# Upload progress tracking (in-memory)
_upload_jobs: dict = {}
def _get_r2_config() -> dict:
"""Read R2 config fresh from env vars every time (not cached at import)."""
return {
"account_id": os.environ.get("R2_ACCOUNT_ID", "35e00c230cc8066252a2d9890b69aea2"),
"access_key_id": os.environ.get("R2_ACCESS_KEY_ID", ""),
"secret_access_key": os.environ.get("R2_SECRET_ACCESS_KEY", ""),
"bucket_name": os.environ.get("R2_BUCKET_NAME", "navitrends-backups"),
}
# Module-level name used in app.py template rendering
R2_BUCKET_NAME = os.environ.get("R2_BUCKET_NAME", "navitrends-backups")
def _get_r2_client():
"""Return a boto3 S3 client pointed at Cloudflare R2."""
if not BOTO3_AVAILABLE:
raise RuntimeError("boto3 is not installed. Run: pip install boto3")
cfg = _get_r2_config()
if not cfg["access_key_id"] or not cfg["secret_access_key"]:
raise RuntimeError(
"R2 credentials not configured. "
"Set R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY environment variables."
)
return boto3.client(
"s3",
endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com",
aws_access_key_id=cfg["access_key_id"],
aws_secret_access_key=cfg["secret_access_key"],
region_name="auto",
)
def r2_is_configured() -> bool:
"""Return True only if credentials are set and boto3 is available."""
cfg = _get_r2_config()
return BOTO3_AVAILABLE and bool(cfg["access_key_id"]) and bool(cfg["secret_access_key"])
def r2_test_connection() -> dict:
if not BOTO3_AVAILABLE:
return {"success": False, "message": "boto3 not installed (pip install boto3)", "bucket_exists": False}
cfg = _get_r2_config()
if not cfg["access_key_id"] or not cfg["secret_access_key"]:
return {"success": False, "message": "R2 credentials not set", "bucket_exists": False}
try:
client = _get_r2_client()
resp = client.list_buckets()
buckets = [b["Name"] for b in resp.get("Buckets", [])]
bucket = cfg["bucket_name"]
exists = bucket in buckets
return {
"success": True,
"message": f"Connected to R2. Bucket '{bucket}': {'exists' if exists else 'not created yet'}",
"bucket_exists": exists,
"buckets": buckets,
}
except NoCredentialsError:
return {"success": False, "message": "Invalid R2 credentials", "bucket_exists": False}
except Exception as e:
return {"success": False, "message": str(e), "bucket_exists": False}
def r2_ensure_bucket() -> tuple[bool, str]:
cfg = _get_r2_config()
bucket = cfg["bucket_name"]
try:
client = _get_r2_client()
try:
client.head_bucket(Bucket=bucket)
return True, f"Bucket '{bucket}' already exists"
except ClientError as e:
if e.response["Error"]["Code"] in ("404", "NoSuchBucket"):
client.create_bucket(Bucket=bucket)
return True, f"Bucket '{bucket}' created successfully"
raise
except Exception as e:
return False, f"Failed to ensure bucket: {e}"
def r2_list_backups() -> list[dict]:
if not r2_is_configured():
return []
cfg = _get_r2_config()
bucket = cfg["bucket_name"]
try:
client = _get_r2_client()
resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/")
objects = []
for obj in resp.get("Contents", []):
# Skip .sha256 checksum files — those are audit files, not backups
if obj["Key"].endswith(".sha256"):
continue
size = obj["Size"]
objects.append({
"key": obj["Key"],
"name": obj["Key"].replace("backups/", ""),
"size": size,
"size_human": _human_size(size),
"last_modified": obj["LastModified"].isoformat(),
"last_modified_str": obj["LastModified"].strftime("%Y-%m-%d %H:%M UTC"),
})
return sorted(objects, key=lambda x: x["last_modified"], reverse=True)
except Exception:
return []
def r2_delete_backup(key: str) -> tuple[bool, str]:
cfg = _get_r2_config()
bucket = cfg["bucket_name"]
try:
client = _get_r2_client()
client.delete_object(Bucket=bucket, Key=key)
return True, f"Deleted {key} from R2"
except Exception as e:
return False, str(e)
def r2_get_bucket_stats() -> dict:
if not r2_is_configured():
return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": False}
cfg = _get_r2_config()
bucket = cfg["bucket_name"]
try:
client = _get_r2_client()
resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/")
# Count and size only real archives, not .sha256 checksum files
objects = [o for o in resp.get("Contents", []) if not o["Key"].endswith(".sha256")]
total = sum(o["Size"] for o in objects)
return {
"count": len(objects),
"total_size": total,
"total_size_human": _human_size(total),
"configured": True,
}
except Exception:
return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": True}
class _ProgressCallback:
def __init__(self, job_id: str, file_size: int):
self._job_id = job_id
self._size = file_size
self._uploaded = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount: int):
with self._lock:
self._uploaded += bytes_amount
pct = int(self._uploaded * 100 / self._size) if self._size else 0
if self._job_id in _upload_jobs:
_upload_jobs[self._job_id]["uploaded"] = self._uploaded
_upload_jobs[self._job_id]["progress"] = pct
_upload_jobs[self._job_id]["log"].append(
f" {_human_size(self._uploaded)} / {_human_size(self._size)} ({pct}%)"
)
def r2_upload_async(local_path: str, job_id: str) -> None:
_upload_jobs[job_id] = {
"status": "running",
"log": [f"Starting upload: {os.path.basename(local_path)}"],
"progress": 0,
"uploaded": 0,
"started": time.time(),
}
try:
if not r2_is_configured():
raise RuntimeError("R2 credentials not configured")
cfg = _get_r2_config()
bucket = cfg["bucket_name"]
ok, msg = r2_ensure_bucket()
if not ok:
raise RuntimeError(msg)
_upload_jobs[job_id]["log"].append(f"OK: {msg}")
file_size = os.path.getsize(local_path)
object_key = f"backups/{os.path.basename(local_path)}"
client = _get_r2_client()
callback = _ProgressCallback(job_id, file_size)
_upload_jobs[job_id]["log"].append(
f"Uploading {_human_size(file_size)} to r2://{bucket}/{object_key}"
)
client.upload_file(
local_path,
bucket,
object_key,
Callback=callback,
ExtraArgs={"Metadata": {
"uploaded-by": "navitrends-platform",
"uploaded-at": datetime.now(timezone.utc).isoformat(),
"original-file": os.path.basename(local_path),
}},
)
sha_path = local_path + ".sha256"
if os.path.exists(sha_path):
client.upload_file(sha_path, bucket, object_key + ".sha256")
_upload_jobs[job_id]["log"].append("SHA256 checksum uploaded")
elapsed = round(time.time() - _upload_jobs[job_id]["started"], 1)
_upload_jobs[job_id]["log"].append(
f"Upload complete in {elapsed}s — r2://{bucket}/{object_key}"
)
_upload_jobs[job_id]["status"] = "done"
_upload_jobs[job_id]["progress"] = 100
_upload_jobs[job_id]["object_key"] = object_key
except Exception as e:
_upload_jobs[job_id]["log"].append(f"Upload failed: {e}")
_upload_jobs[job_id]["status"] = "error"
def get_upload_job(job_id: str) -> dict | None:
job = _upload_jobs.get(job_id)
if not job:
return None
return {
"status": job["status"],
"log": job["log"],
"progress": job["progress"],
"elapsed": round(time.time() - job.get("started", time.time())),
}
def _human_size(n: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} TB"
def r2_audit_backup(key: str) -> dict:
"""Audit a backup in R2 by verifying its SHA256 checksum and metadata."""
cfg = _get_r2_config()
bucket = cfg["bucket_name"]
report = {
"backup": key,
"status": "unknown",
"checks": [],
"warnings": [],
"errors": [],
}
try:
client = _get_r2_client()
# Check 1: Object exists
try:
head = client.head_object(Bucket=bucket, Key=key)
size = head["ContentLength"]
last_mod = head["LastModified"].strftime("%Y-%m-%d %H:%M UTC")
report["checks"].append(f"✅ Object exists in R2 ({_human_size(size)}, modified {last_mod})")
report["size"] = size
report["size_human"] = _human_size(size)
report["last_modified"] = last_mod
except ClientError as e:
report["errors"].append(f"❌ Object not found in R2: {e}")
report["status"] = "error"
return report
# Check 2: SHA256 companion file
sha_key = key + ".sha256"
try:
sha_obj = client.get_object(Bucket=bucket, Key=sha_key)
sha_content = sha_obj["Body"].read().decode("utf-8").strip()
stored_hash = sha_content.split()[0] if sha_content else ""
report["checks"].append("✅ SHA256 checksum file found")
report["stored_hash"] = (stored_hash[:16] + "") if stored_hash else "N/A"
except ClientError:
report["warnings"].append("⚠️ No SHA256 checksum file found — integrity cannot be verified")
report["stored_hash"] = None
# Check 3: Upload metadata
metadata = head.get("Metadata", {})
uploaded_by = metadata.get("uploaded-by", "")
uploaded_at = metadata.get("uploaded-at", "")
if uploaded_by:
report["checks"].append(f"✅ Uploaded by: {uploaded_by}")
if uploaded_at:
report["checks"].append(f"✅ Upload timestamp: {uploaded_at[:19]}")
if not uploaded_by and not uploaded_at:
report["warnings"].append("⚠️ No upload metadata found (may have been uploaded outside the platform)")
# Check 4: File size sanity
if size < 10 * 1024 * 1024:
report["warnings"].append(f"⚠️ File is very small ({_human_size(size)}) — may be incomplete")
else:
report["checks"].append(f"✅ File size looks healthy ({_human_size(size)})")
# Check 5: Filename format
import re
filename = key.replace("backups/", "")
if re.match(r"myapps-backup-\d{8}_\d{6}\.tar\.gz", filename):
report["checks"].append("✅ Filename format is valid")
else:
report["warnings"].append(f"⚠️ Unexpected filename format: {filename}")
# Final status
if report["errors"]:
report["status"] = "error"
elif report["warnings"]:
report["status"] = "warning"
else:
report["status"] = "healthy"
except Exception as e:
report["errors"].append(f"❌ Audit failed: {e}")
report["status"] = "error"
return report